Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 84 additions & 31 deletions LWA_EPIC/LWA_EPIC.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@
from lsl.reader.ldp import TBNFile, TBFFile
from lsl.common.stations import lwa1, lwasv

#Optimized Bifrost blocks for EPIC
from bifrost.VGrid import VGrid
from bifrost.XGrid import XGrid
from bifrost.aCorr import aCorr

# some py2/3 compatibility
if sys.version_info.major < 3:
range = xrange # noqa
Expand Down Expand Up @@ -742,8 +747,8 @@ def main(self):
ishape = (self.ntime_gulp, nchan, nstand, npol)
ogulp_size = self.ntime_gulp * self.nchan_out * nstand * self.npol_out * 1 # ci4
oshape = (self.ntime_gulp, self.nchan_out, nstand, self.npol_out)
self.iring.resize(igulp_size)
self.oring.resize(ogulp_size) # , obuf_size)
self.iring.resize(igulp_size, buffer_factor= 8)
self.oring.resize(ogulp_size, buffer_factor= 128) # , obuf_size)

ohdr = ihdr.copy()
ohdr["nchan"] = self.nchan_out
Expand Down Expand Up @@ -978,8 +983,8 @@ def main(self):

oshape = (1, nchan, npol ** 2, self.grid_size, self.grid_size)
ogulp_size = nchan * npol ** 2 * self.grid_size * self.grid_size * 8
self.iring.resize(igulp_size)
self.oring.resize(ogulp_size, buffer_factor=5)
self.iring.resize(igulp_size, buffer_factor=128)
self.oring.resize(ogulp_size, buffer_factor=256)
prev_time = time.time()
with oring.begin_sequence(time_tag=iseq.time_tag, header=ohdr_str) as oseq:
iseq_spans = iseq.read(igulp_size)
Expand Down Expand Up @@ -1035,18 +1040,24 @@ def main(self):
# Grid the Antennas
if self.benchmark is True:
timeg1 = time.time()

try:
bf_romein.execute(udata, gdata)
bf_vgrid.execute(udata, gdata)
except NameError:
bf_romein = Romein()
bf_romein.init(self.locs, gphases, self.grid_size, polmajor=False)
bf_romein.execute(udata, gdata)
bf_vgrid = VGrid()
bf_vgrid.init(self.locs, gphases, self.grid_size, polmajor=False)
bf_vgrid.execute(udata, gdata)

#try:
# bf_romein.execute(udata, gdata)
#except NameError:
# bf_romein = Romein()
# bf_romein.init(self.locs, gphases, self.grid_size, polmajor=False)
# bf_romein.execute(udata, gdata)
Comment on lines +1050 to +1055
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove block comments

Suggested change
#try:
# bf_romein.execute(udata, gdata)
#except NameError:
# bf_romein = Romein()
# bf_romein.init(self.locs, gphases, self.grid_size, polmajor=False)
# bf_romein.execute(udata, gdata)

gdata = gdata.reshape(self.ntime_gulp * nchan * npol, self.grid_size, self.grid_size)
# gdata = self.LinAlgObj.matmul(1.0, udata, bfantgridmap, 0.0, gdata)
if self.benchmark is True:
timeg2 = time.time()
print(" Romein time: %f" % (timeg2 - timeg1))
print(" Grid time: %f" % (timeg2 - timeg1))

# Inverse transform

Expand Down Expand Up @@ -1135,19 +1146,35 @@ def main(self):
)

# Cross multiply to calculate autocorrs
bifrost.map(
"a(i,j,k,l) += (b(i,j,k,l/2) * b(i,j,k,l%2).conj())",
{"a": autocorrs, "b": udata, "t": self.ntime_gulp},
axis_names=("i", "j", "k", "l"),
shape=(self.ntime_gulp, nchan, nstand, npol ** 2),
#bifrost.map(
# "a(i,j,k,l) += (b(i,j,k,l/2) * b(i,j,k,l%2).conj())",
# {"a": autocorrs, "b": udata, "t": self.ntime_gulp},
# axis_names=("i", "j", "k", "l"),
# shape=(self.ntime_gulp, nchan, nstand, npol ** 2),
#)
Comment on lines +1149 to +1154
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#bifrost.map(
# "a(i,j,k,l) += (b(i,j,k,l/2) * b(i,j,k,l%2).conj())",
# {"a": autocorrs, "b": udata, "t": self.ntime_gulp},
# axis_names=("i", "j", "k", "l"),
# shape=(self.ntime_gulp, nchan, nstand, npol ** 2),
#)

try:
bf_auto.execute(udata, autocorrs)
except NameError:
bf_auto = aCorr()
bf_auto.init(self.locs, polmajor=False)
bf_auto.execute(udata, autocorrs)
autocorrs = autocorrs.reshape(
self.ntime_gulp, nchan, nstand, npol ** 2
)

bifrost.map(
"a(i,j,p,k,l) += b(0,i,j,p/2,k,l)*b(0,i,j,p%2,k,l).conj()",
{"a": crosspol, "b": gdata},
axis_names=("i", "j", "p", "k", "l"),
shape=(self.ntime_gulp, nchan, npol ** 2, self.grid_size, self.grid_size),
)

#bifrost.map(
# "a(i,j,p,k,l) += b(0,i,j,p/2,k,l)*b(0,i,j,p%2,k,l).conj()",
# {"a": crosspol, "b": gdata},
# axis_names=("i", "j", "p", "k", "l"),
# shape=(self.ntime_gulp, nchan, npol ** 2, self.grid_size, self.grid_size),
#)
Comment on lines +1166 to +1171
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#bifrost.map(
# "a(i,j,p,k,l) += b(0,i,j,p/2,k,l)*b(0,i,j,p%2,k,l).conj()",
# {"a": crosspol, "b": gdata},
# axis_names=("i", "j", "p", "k", "l"),
# shape=(self.ntime_gulp, nchan, npol ** 2, self.grid_size, self.grid_size),
#)

try:
bf_gmul.execute(gdata, crosspol)
except NameError:
bf_gmul = XGrid()
bf_gmul.init(self.grid_size, polmajor=False)
bf_gmul.execute(gdata, crosspol)
crosspol = crosspol.reshape(
self.ntime_gulp, nchan, npol ** 2, self.grid_size, self.grid_size
)
Expand All @@ -1168,14 +1195,22 @@ def main(self):
autocorr_g = autocorr_g.reshape(
1, nchan, npol ** 2, self.grid_size, self.grid_size
)
#try:
# bf_romein_autocorr.execute(autocorrs_av, autocorr_g)
#except NameError:
# bf_romein_autocorr = Romein()
# bf_romein_autocorr.init(
# autocorr_lo, autocorr_il, self.grid_size, polmajor=False
# )
# bf_romein_autocorr.execute(autocorrs_av, autocorr_g)
Comment on lines +1198 to +1205
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#try:
# bf_romein_autocorr.execute(autocorrs_av, autocorr_g)
#except NameError:
# bf_romein_autocorr = Romein()
# bf_romein_autocorr.init(
# autocorr_lo, autocorr_il, self.grid_size, polmajor=False
# )
# bf_romein_autocorr.execute(autocorrs_av, autocorr_g)

try:
bf_romein_autocorr.execute(autocorrs_av, autocorr_g)
bf_vgrid_autocorr.execute(autocorrs_av, autocorr_g)
except NameError:
bf_romein_autocorr = Romein()
bf_romein_autocorr.init(
bf_vgrid_autocorr = VGrid()
bf_vgrid_autocorr.init(
autocorr_lo, autocorr_il, self.grid_size, polmajor=False
)
bf_romein_autocorr.execute(autocorrs_av, autocorr_g)
bf_vgrid_autocorr.execute(autocorrs_av, autocorr_g)
autocorr_g = autocorr_g.reshape(1 * nchan * npol ** 2, self.grid_size, self.grid_size)
# autocorr_g = romein_float(autocorrs_av,autocorr_g,autocorr_il,autocorr_lx,autocorr_ly,autocorr_lz,self.ant_extent,self.grid_size,nstand,nchan*npol**2)
# Inverse FFT
Expand Down Expand Up @@ -2096,6 +2131,13 @@ def gen_args(return_parser=False):
default=1000,
help="How many milliseconds to accumulate an image over",
)
group3.add_argument(
"--duration",
type=int,
default=3600,
help="Duration of EPIC (seconds)",
)


group4 = parser.add_argument_group("Correlation Options")
group4.add_argument(
Expand Down Expand Up @@ -2165,16 +2207,17 @@ def gen_args(return_parser=False):
print("Output directory does not exist. Defaulting to current directory.")
args.out_dir = "."

if args.removeautocorrs:
raise NotImplementedError(
"Removing autocorrelations is not yet properly implemented."
)
#if args.removeautocorrs:
# raise NotImplementedError(
# "Removing autocorrelations is not yet properly implemented."
# )
Comment on lines +2210 to +2213
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this commented out?


if return_parser:
return args, parser
else:
return args


def main(args, parser):

log = logging.getLogger(__name__)
Expand All @@ -2189,8 +2232,8 @@ def main(args, parser):
log.setLevel(logging.DEBUG)

# Setup the cores and GPUs to use
cores = [0, 2, 3, 4, 5, 6, 7]
gpus = [0, 0, 0, 0, 0, 0, 0]
cores = [3, 4, 5, 6, 7]
gpus = [0, 0, 0, 0, 0]

# Setup the signal handling
ops = []
Expand All @@ -2205,6 +2248,12 @@ def handle_signal_terminate(signum, frame):
log.warning("Received signal %i %s", signum, SIGNAL_NAMES[signum])
try:
ops[0].shutdown()
if SIGNAL_NAMES[signum] == "SIGINT":
print("****Observation is Interrupted****")
os._exit(0)
if SIGNAL_NAMES[signum] == "SIGALRM":
print("****Observation is Complete****")
os._exit(0)
except IndexError:
pass
shutdown_event.set()
Expand All @@ -2215,6 +2264,7 @@ def handle_signal_terminate(signum, frame):
signal.SIGQUIT,
signal.SIGTERM,
signal.SIGTSTP,
signal.SIGALRM
]:
signal.signal(sig, handle_signal_terminate)

Expand Down Expand Up @@ -2391,6 +2441,9 @@ def handle_signal_terminate(signum, frame):
thread.daemon = False
thread.start()

signal.alarm(args.duration)


while not shutdown_event.is_set():
# Keep threads alive -- if reader is still alive, prevent timeout signal from executing
if threads[0].is_alive():
Expand Down