diff --git a/LWA_EPIC/LWA_EPIC.py b/LWA_EPIC/LWA_EPIC.py index bedd9958..181bf4c2 100755 --- a/LWA_EPIC/LWA_EPIC.py +++ b/LWA_EPIC/LWA_EPIC.py @@ -55,6 +55,11 @@ from lsl.reader.ldp import TBNFile, TBFFile from lsl.common.stations import lwa1, lwasv +#Optimized Bifrost blocks for EPIC +from bifrost.VGrid import VGrid +from bifrost.XGrid import XGrid +from bifrost.aCorr import aCorr + # some py2/3 compatibility if sys.version_info.major < 3: range = xrange # noqa @@ -742,8 +747,8 @@ def main(self): ishape = (self.ntime_gulp, nchan, nstand, npol) ogulp_size = self.ntime_gulp * self.nchan_out * nstand * self.npol_out * 1 # ci4 oshape = (self.ntime_gulp, self.nchan_out, nstand, self.npol_out) - self.iring.resize(igulp_size) - self.oring.resize(ogulp_size) # , obuf_size) + self.iring.resize(igulp_size, buffer_factor= 8) + self.oring.resize(ogulp_size, buffer_factor= 128) # , obuf_size) ohdr = ihdr.copy() ohdr["nchan"] = self.nchan_out @@ -978,8 +983,8 @@ def main(self): oshape = (1, nchan, npol ** 2, self.grid_size, self.grid_size) ogulp_size = nchan * npol ** 2 * self.grid_size * self.grid_size * 8 - self.iring.resize(igulp_size) - self.oring.resize(ogulp_size, buffer_factor=5) + self.iring.resize(igulp_size, buffer_factor=128) + self.oring.resize(ogulp_size, buffer_factor=256) prev_time = time.time() with oring.begin_sequence(time_tag=iseq.time_tag, header=ohdr_str) as oseq: iseq_spans = iseq.read(igulp_size) @@ -1035,18 +1040,24 @@ def main(self): # Grid the Antennas if self.benchmark is True: timeg1 = time.time() - try: - bf_romein.execute(udata, gdata) + bf_vgrid.execute(udata, gdata) except NameError: - bf_romein = Romein() - bf_romein.init(self.locs, gphases, self.grid_size, polmajor=False) - bf_romein.execute(udata, gdata) + bf_vgrid = VGrid() + bf_vgrid.init(self.locs, gphases, self.grid_size, polmajor=False) + bf_vgrid.execute(udata, gdata) + + #try: + # bf_romein.execute(udata, gdata) + #except NameError: + # bf_romein = Romein() + # bf_romein.init(self.locs, gphases, self.grid_size, polmajor=False) + # bf_romein.execute(udata, gdata) gdata = gdata.reshape(self.ntime_gulp * nchan * npol, self.grid_size, self.grid_size) # gdata = self.LinAlgObj.matmul(1.0, udata, bfantgridmap, 0.0, gdata) if self.benchmark is True: timeg2 = time.time() - print(" Romein time: %f" % (timeg2 - timeg1)) + print(" Grid time: %f" % (timeg2 - timeg1)) # Inverse transform @@ -1135,19 +1146,35 @@ def main(self): ) # Cross multiply to calculate autocorrs - bifrost.map( - "a(i,j,k,l) += (b(i,j,k,l/2) * b(i,j,k,l%2).conj())", - {"a": autocorrs, "b": udata, "t": self.ntime_gulp}, - axis_names=("i", "j", "k", "l"), - shape=(self.ntime_gulp, nchan, nstand, npol ** 2), + #bifrost.map( + # "a(i,j,k,l) += (b(i,j,k,l/2) * b(i,j,k,l%2).conj())", + # {"a": autocorrs, "b": udata, "t": self.ntime_gulp}, + # axis_names=("i", "j", "k", "l"), + # shape=(self.ntime_gulp, nchan, nstand, npol ** 2), + #) + try: + bf_auto.execute(udata, autocorrs) + except NameError: + bf_auto = aCorr() + bf_auto.init(self.locs, polmajor=False) + bf_auto.execute(udata, autocorrs) + autocorrs = autocorrs.reshape( + self.ntime_gulp, nchan, nstand, npol ** 2 ) - bifrost.map( - "a(i,j,p,k,l) += b(0,i,j,p/2,k,l)*b(0,i,j,p%2,k,l).conj()", - {"a": crosspol, "b": gdata}, - axis_names=("i", "j", "p", "k", "l"), - shape=(self.ntime_gulp, nchan, npol ** 2, self.grid_size, self.grid_size), - ) + + #bifrost.map( + # "a(i,j,p,k,l) += b(0,i,j,p/2,k,l)*b(0,i,j,p%2,k,l).conj()", + # {"a": crosspol, "b": gdata}, + # axis_names=("i", "j", "p", "k", "l"), + # shape=(self.ntime_gulp, nchan, npol ** 2, self.grid_size, self.grid_size), + #) + try: + bf_gmul.execute(gdata, crosspol) + except NameError: + bf_gmul = XGrid() + bf_gmul.init(self.grid_size, polmajor=False) + bf_gmul.execute(gdata, crosspol) crosspol = crosspol.reshape( self.ntime_gulp, nchan, npol ** 2, self.grid_size, self.grid_size ) @@ -1168,14 +1195,22 @@ def main(self): autocorr_g = autocorr_g.reshape( 1, nchan, npol ** 2, self.grid_size, self.grid_size ) + #try: + # bf_romein_autocorr.execute(autocorrs_av, autocorr_g) + #except NameError: + # bf_romein_autocorr = Romein() + # bf_romein_autocorr.init( + # autocorr_lo, autocorr_il, self.grid_size, polmajor=False + # ) + # bf_romein_autocorr.execute(autocorrs_av, autocorr_g) try: - bf_romein_autocorr.execute(autocorrs_av, autocorr_g) + bf_vgrid_autocorr.execute(autocorrs_av, autocorr_g) except NameError: - bf_romein_autocorr = Romein() - bf_romein_autocorr.init( + bf_vgrid_autocorr = VGrid() + bf_vgrid_autocorr.init( autocorr_lo, autocorr_il, self.grid_size, polmajor=False ) - bf_romein_autocorr.execute(autocorrs_av, autocorr_g) + bf_vgrid_autocorr.execute(autocorrs_av, autocorr_g) autocorr_g = autocorr_g.reshape(1 * nchan * npol ** 2, self.grid_size, self.grid_size) # autocorr_g = romein_float(autocorrs_av,autocorr_g,autocorr_il,autocorr_lx,autocorr_ly,autocorr_lz,self.ant_extent,self.grid_size,nstand,nchan*npol**2) # Inverse FFT @@ -2096,6 +2131,13 @@ def gen_args(return_parser=False): default=1000, help="How many milliseconds to accumulate an image over", ) + group3.add_argument( + "--duration", + type=int, + default=3600, + help="Duration of EPIC (seconds)", + ) + group4 = parser.add_argument_group("Correlation Options") group4.add_argument( @@ -2165,16 +2207,17 @@ def gen_args(return_parser=False): print("Output directory does not exist. Defaulting to current directory.") args.out_dir = "." - if args.removeautocorrs: - raise NotImplementedError( - "Removing autocorrelations is not yet properly implemented." - ) + #if args.removeautocorrs: + # raise NotImplementedError( + # "Removing autocorrelations is not yet properly implemented." + # ) if return_parser: return args, parser else: return args + def main(args, parser): log = logging.getLogger(__name__) @@ -2189,8 +2232,8 @@ def main(args, parser): log.setLevel(logging.DEBUG) # Setup the cores and GPUs to use - cores = [0, 2, 3, 4, 5, 6, 7] - gpus = [0, 0, 0, 0, 0, 0, 0] + cores = [3, 4, 5, 6, 7] + gpus = [0, 0, 0, 0, 0] # Setup the signal handling ops = [] @@ -2205,6 +2248,12 @@ def handle_signal_terminate(signum, frame): log.warning("Received signal %i %s", signum, SIGNAL_NAMES[signum]) try: ops[0].shutdown() + if SIGNAL_NAMES[signum] == "SIGINT": + print("****Observation is Interrupted****") + os._exit(0) + if SIGNAL_NAMES[signum] == "SIGALRM": + print("****Observation is Complete****") + os._exit(0) except IndexError: pass shutdown_event.set() @@ -2215,6 +2264,7 @@ def handle_signal_terminate(signum, frame): signal.SIGQUIT, signal.SIGTERM, signal.SIGTSTP, + signal.SIGALRM ]: signal.signal(sig, handle_signal_terminate) @@ -2391,6 +2441,9 @@ def handle_signal_terminate(signum, frame): thread.daemon = False thread.start() + signal.alarm(args.duration) + + while not shutdown_event.is_set(): # Keep threads alive -- if reader is still alive, prevent timeout signal from executing if threads[0].is_alive():