diff --git a/pyat/at/acceptance/acceptance.py b/pyat/at/acceptance/acceptance.py index 913ac9511c..6f4593e84c 100644 --- a/pyat/at/acceptance/acceptance.py +++ b/pyat/at/acceptance/acceptance.py @@ -1,30 +1,41 @@ """Acceptance computation""" + import numpy from .boundary import GridMode + # noinspection PyProtectedMember from .boundary import boundary_search from typing import Optional, Sequence import multiprocessing -from ..lattice import Lattice, Refpts, frequency_control, AtError +from ..lattice import Lattice, Refpts, frequency_control -__all__ = ['get_acceptance', 'get_1d_acceptance', 'get_horizontal_acceptance', - 'get_vertical_acceptance', 'get_momentum_acceptance'] +__all__ = [ + "get_acceptance", + "get_1d_acceptance", + "get_horizontal_acceptance", + "get_vertical_acceptance", + "get_momentum_acceptance", +] @frequency_control def get_acceptance( - ring: Lattice, planes, npoints, amplitudes, - nturns: Optional[int] = 1024, - refpts: Optional[Refpts] = None, - dp: Optional[float] = None, - offset: Sequence[float] = None, bounds=None, - grid_mode: Optional[GridMode] = GridMode.RADIAL, - use_mp: Optional[bool] = False, - verbose: Optional[bool] = True, - divider: Optional[int] = 2, - shift_zero: Optional[float] = 1.0e-6, - start_method: Optional[str] = None, + ring: Lattice, + planes, + npoints, + amplitudes, + nturns: Optional[int] = 1024, + refpts: Optional[Refpts] = None, + dp: Optional[float] = None, + offset: Sequence[float] = None, + bounds=None, + grid_mode: Optional[GridMode] = GridMode.RADIAL, + use_mp: Optional[bool] = False, + verbose: Optional[bool] = True, + divider: Optional[int] = 2, + shift_zero: Optional[float] = 1.0e-6, + start_method: Optional[str] = None, ): # noinspection PyUnresolvedReferences r"""Computes the acceptance at ``repfts`` observation points @@ -91,92 +102,86 @@ def get_acceptance( * When``use_mp=True`` all the available CPUs will be used. This behavior can be changed by setting ``at.DConstant.patpass_poolsize`` to the desired value + * When multiple ``refpts`` are provided particles are first + projected to the beginning of the ring with tracking. Then, + all particles are tracked up to ``nturns``. This allows to + do most of the work in a single function call and allows for + full parallelization. """ kwargs = {} if start_method is not None: - kwargs['start_method'] = start_method + kwargs["start_method"] = start_method if verbose: nproc = multiprocessing.cpu_count() - print('\n{0} cpu found for acceptance calculation'.format(nproc)) + print("\n{0} cpu found for acceptance calculation".format(nproc)) if use_mp: nprocu = nproc - print('Multi-process acceptance calculation selected...') + print("Multi-process acceptance calculation selected...") if nproc == 1: - print('Consider use_mp=False for single core computations') + print("Consider use_mp=False for single core computations") else: nprocu = 1 - print('Single process acceptance calculation selected...') + print("Single process acceptance calculation selected...") if nproc > 1: - print('Consider use_mp=True for parallelized computations') + print("Consider use_mp=True for parallelized computations") np = numpy.atleast_1d(npoints) na = 2 if len(np) == 2: na = np[1] npp = numpy.prod(npoints) - rpp = 2*numpy.ceil(numpy.log2(np[0]))*numpy.ceil(na/nprocu) - mpp = npp/nprocu + rpp = 2 * numpy.ceil(numpy.log2(np[0])) * numpy.ceil(na / nprocu) + mpp = npp / nprocu if rpp > mpp: - cond = (grid_mode is GridMode.RADIAL or - grid_mode is GridMode.CARTESIAN) + cond = grid_mode is GridMode.RADIAL or grid_mode is GridMode.CARTESIAN else: cond = grid_mode is GridMode.RECURSIVE if rpp > mpp and not cond: - print('The estimated load for grid mode is {0}'.format(mpp)) - print('The estimated load for recursive mode is {0}'.format(rpp)) - print('{0} or {1} is recommended'.format(GridMode.RADIAL, - GridMode.CARTESIAN)) + print("The estimated load for grid mode is {0}".format(mpp)) + print("The estimated load for recursive mode is {0}".format(rpp)) + print( + "{0} or {1} is recommended".format(GridMode.RADIAL, GridMode.CARTESIAN) + ) elif rpp < mpp and not cond: - print('The estimated load for grid mode is {0}'.format(mpp)) - print('The estimated load for recursive mode is {0}'.format(rpp)) - print('{0} is recommended'.format(GridMode.RECURSIVE)) - - boundary = [] - survived = [] - grid = [] - if refpts is not None: - rp = ring.uint32_refpts(refpts) - else: - rp = numpy.atleast_1d(refpts) - if offset is not None: - try: - offset = numpy.broadcast_to(offset, (len(rp), 6)) - except ValueError: - msg = ('offset and refpts have incoherent ' - 'shapes: {0}, {1}'.format(numpy.shape(offset), - numpy.shape(refpts))) - raise AtError(msg) - else: - offset=[None for _ in rp] - for r, o in zip(rp, offset): - b, s, g = boundary_search(ring, planes, npoints, amplitudes, - nturns=nturns, obspt=r, dp=dp, - offset=o, bounds=bounds, - grid_mode=grid_mode, use_mp=use_mp, - verbose=verbose, divider=divider, - shift_zero=shift_zero, **kwargs) - boundary.append(b) - survived.append(s) - grid.append(g) - if len(rp) == 1: - return boundary[0], survived[0], grid[0] - else: - return boundary, survived, grid + print("The estimated load for grid mode is {0}".format(mpp)) + print("The estimated load for recursive mode is {0}".format(rpp)) + print("{0} is recommended".format(GridMode.RECURSIVE)) + + b, s, g = boundary_search( + ring, + planes, + npoints, + amplitudes, + nturns=nturns, + obspt=refpts, + dp=dp, + offset=offset, + bounds=bounds, + grid_mode=grid_mode, + use_mp=use_mp, + verbose=verbose, + divider=divider, + shift_zero=shift_zero, + **kwargs, + ) + return b, s, g def get_1d_acceptance( - ring: Lattice, plane: str, resolution: float, amplitude: float, - nturns: Optional[int] = 1024, - refpts: Optional[Refpts] = None, - dp: Optional[float] = None, - offset: Sequence[float] = None, - grid_mode: Optional[GridMode] = GridMode.RADIAL, - use_mp: Optional[bool] = False, - verbose: Optional[bool] = False, - divider: Optional[int] = 2, - shift_zero: Optional[float] = 1.0e-6, - start_method: Optional[str] = None, - + ring: Lattice, + plane: str, + resolution: float, + amplitude: float, + nturns: Optional[int] = 1024, + refpts: Optional[Refpts] = None, + dp: Optional[float] = None, + offset: Sequence[float] = None, + grid_mode: Optional[GridMode] = GridMode.RADIAL, + use_mp: Optional[bool] = False, + verbose: Optional[bool] = False, + divider: Optional[int] = 2, + shift_zero: Optional[float] = 1.0e-6, + start_method: Optional[str] = None, ): r"""Computes the 1D acceptance at ``refpts`` observation points @@ -231,29 +236,44 @@ def get_1d_acceptance( * When``use_mp=True`` all the available CPUs will be used. This behavior can be changed by setting ``at.DConstant.patpass_poolsize`` to the desired value + * When multiple ``refpts`` are provided particles are first + projected to the beginning of the ring with tracking. Then, + all particles are tracked up to ``nturns``. This allows to + do most of the work in a single function call and allows for + full parallelization. """ if not use_mp: grid_mode = GridMode.RECURSIVE - assert len(numpy.atleast_1d(plane)) == 1, \ - '1D acceptance: single plane required' - assert numpy.isscalar(resolution), '1D acceptance: scalar args required' - assert numpy.isscalar(amplitude), '1D acceptance: scalar args required' - npoint = numpy.ceil(amplitude/resolution) + assert len(numpy.atleast_1d(plane)) == 1, "1D acceptance: single plane required" + assert numpy.isscalar(resolution), "1D acceptance: scalar args required" + assert numpy.isscalar(amplitude), "1D acceptance: scalar args required" + npoint = numpy.ceil(amplitude / resolution) if grid_mode is not GridMode.RECURSIVE: - assert npoint > 1, \ - 'Grid has only one point: increase amplitude or reduce resolution' - b, s, g = get_acceptance(ring, plane, npoint, amplitude, - nturns=nturns, dp=dp, refpts=refpts, - grid_mode=grid_mode, use_mp=use_mp, - verbose=verbose, start_method=start_method, - divider=divider, shift_zero=shift_zero, - offset=offset) + assert ( + npoint > 1 + ), "Grid has only one point: increase amplitude or reduce resolution" + b, s, g = get_acceptance( + ring, + plane, + npoint, + amplitude, + nturns=nturns, + dp=dp, + refpts=refpts, + grid_mode=grid_mode, + use_mp=use_mp, + verbose=verbose, + start_method=start_method, + divider=divider, + shift_zero=shift_zero, + offset=offset, + ) return numpy.squeeze(b), s, g -def get_horizontal_acceptance(ring: Lattice, - resolution: float, amplitude: float, - *args, **kwargs): +def get_horizontal_acceptance( + ring: Lattice, resolution: float, amplitude: float, *args, **kwargs +): r"""Computes the 1D horizontal acceptance at ``refpts`` observation points See :py:func:`get_acceptance` @@ -306,13 +326,18 @@ def get_horizontal_acceptance(ring: Lattice, * When``use_mp=True`` all the available CPUs will be used. This behavior can be changed by setting ``at.DConstant.patpass_poolsize`` to the desired value + * When multiple ``refpts`` are provided particles are first + projected to the beginning of the ring with tracking. Then, + all particles are tracked up to ``nturns``. This allows to + do most of the work in a single function call and allows for + full parallelization. """ - return get_1d_acceptance(ring, 'x', resolution, amplitude, *args, **kwargs) + return get_1d_acceptance(ring, "x", resolution, amplitude, *args, **kwargs) -def get_vertical_acceptance(ring: Lattice, - resolution: float, amplitude: float, - *args, **kwargs): +def get_vertical_acceptance( + ring: Lattice, resolution: float, amplitude: float, *args, **kwargs +): r"""Computes the 1D vertical acceptance at refpts observation points See :py:func:`get_acceptance` @@ -365,13 +390,18 @@ def get_vertical_acceptance(ring: Lattice, * When``use_mp=True`` all the available CPUs will be used. This behavior can be changed by setting ``at.DConstant.patpass_poolsize`` to the desired value + * When multiple ``refpts`` are provided particles are first + projected to the beginning of the ring with tracking. Then, + all particles are tracked up to ``nturns``. This allows to + do most of the work in a single function call and allows for + full parallelization. """ - return get_1d_acceptance(ring, 'y', resolution, amplitude, *args, **kwargs) + return get_1d_acceptance(ring, "y", resolution, amplitude, *args, **kwargs) -def get_momentum_acceptance(ring: Lattice, - resolution: float, amplitude: float, - *args, **kwargs): +def get_momentum_acceptance( + ring: Lattice, resolution: float, amplitude: float, *args, **kwargs +): r"""Computes the 1D momentum acceptance at refpts observation points See :py:func:`get_acceptance` @@ -424,9 +454,13 @@ def get_momentum_acceptance(ring: Lattice, * When``use_mp=True`` all the available CPUs will be used. This behavior can be changed by setting ``at.DConstant.patpass_poolsize`` to the desired value + * When multiple ``refpts`` are provided particles are first + projected to the beginning of the ring with tracking. Then, + all particles are tracked up to ``nturns``. This allows to + do most of the work in a single function call and allows for + full parallelization. """ - return get_1d_acceptance(ring, 'dp', resolution, amplitude, - *args, **kwargs) + return get_1d_acceptance(ring, "dp", resolution, amplitude, *args, **kwargs) Lattice.get_acceptance = get_acceptance diff --git a/pyat/at/acceptance/boundary.py b/pyat/at/acceptance/boundary.py index 465018b552..5ef6e2fefe 100644 --- a/pyat/at/acceptance/boundary.py +++ b/pyat/at/acceptance/boundary.py @@ -4,7 +4,7 @@ grid definitions """ -from at.lattice import Lattice, AtError, AtWarning +from at.lattice import Lattice, AtError, AtWarning, Refpts from typing import Optional, Sequence from enum import Enum import numpy @@ -13,47 +13,46 @@ import time import warnings -__all__ = ['GridMode'] +__all__ = ["GridMode"] -_pdict = {'x': 0, 'xp': 1, - 'y': 2, 'yp': 3, - 'dp': 4, 'ct': 5} +_pdict = {"x": 0, "xp": 1, "y": 2, "yp": 3, "dp": 4, "ct": 5} class GridMode(Enum): """ Grid definition for 2D acceptance boundary search """ - RADIAL = 0 #: full [:math:`\:r, \theta\:`] grid - CARTESIAN = 1 #: full [:math:`\:x, y\:`] grid - RECURSIVE = 2 #: radial recursive search + RADIAL = 0 #: full [:math:`\:r, \theta\:`] grid + CARTESIAN = 1 #: full [:math:`\:x, y\:`] grid + RECURSIVE = 2 #: radial recursive search -def grid_config(planes, amplitudes, npoints, bounds, grid_mode, - shift_zero): - """" + +def grid_config(planes, amplitudes, npoints, bounds, grid_mode, shift_zero): + """ Returns an object that defines the grid configuration """ bounds = numpy.atleast_2d(bounds) bounds = tuple(map(tuple, bounds)) shape = numpy.array(npoints, dtype=numpy.int32) - d = {'planes': numpy.atleast_1d(planes), - 'planesi': numpy.atleast_1d(get_plane_index(planes)), - 'amplitudes': numpy.atleast_1d(amplitudes), - 'shape': numpy.atleast_1d(shape), - 'bounds': bounds, - 'mode': grid_mode, - 'shift_zero': shift_zero} - return namedtuple('config', d.keys())(**d) + d = { + "planes": numpy.atleast_1d(planes), + "planesi": numpy.atleast_1d(get_plane_index(planes)), + "amplitudes": numpy.atleast_1d(amplitudes), + "shape": numpy.atleast_1d(shape), + "bounds": bounds, + "mode": grid_mode, + "shift_zero": shift_zero, + } + return namedtuple("config", d.keys())(**d) def grid(grid, offset): - """" + """ Returns a grid object """ - d = {'grid': numpy.atleast_2d(grid), - 'offset': numpy.atleast_1d(offset)} - return namedtuple('grid', d.keys())(**d) + d = {"grid": numpy.atleast_2d(grid), "offset": numpy.atleast_1d(offset)} + return namedtuple("grid", d.keys())(**d) def get_plane_index(planes): @@ -66,9 +65,9 @@ def get_plane_index(planes): try: planesi = numpy.append(planesi, _pdict[p]) except KeyError: - raise AtError('Allowed values for plane are x,xp,y,yp,dp,ct') + raise AtError("Allowed values for plane are x,xp,y,yp,dp,ct") else: - raise AtError('Allowed values for plane are x,xp,y,yp,dp,ct') + raise AtError("Allowed values for plane are x,xp,y,yp,dp,ct") return planesi @@ -78,7 +77,7 @@ def set_ring_orbit(ring, dp, obspt, orbit): closed orbit """ if obspt is not None: - assert numpy.isscalar(obspt), 'Scalar value needed for obspt' + assert numpy.isscalar(obspt), "Scalar value needed for obspt" ring = ring.rotate(obspt) if orbit is None: @@ -87,8 +86,9 @@ def set_ring_orbit(ring, dp, obspt, orbit): return orbit, ring -def grid_configuration(planes, npoints, amplitudes, grid_mode, bounds=None, - shift_zero=1.0e-6): +def grid_configuration( + planes, npoints, amplitudes, grid_mode, bounds=None, shift_zero=1.0e-6 +): """ Return a grid configuration based on user input parameters, the ordering is as follows: CARTESIAN: (x,y), RADIAL/RECURSIVE (r, theta). @@ -96,17 +96,16 @@ def grid_configuration(planes, npoints, amplitudes, grid_mode, bounds=None, """ ndims = len(numpy.atleast_1d(planes)) if ndims > 2 or ndims == 0: - raise AtError('planes can have 1 or 2 element (1D or 2D aperture)') + raise AtError("planes can have 1 or 2 element (1D or 2D aperture)") elif ndims == 1 and grid_mode is GridMode.RADIAL: grid_mode = GridMode.CARTESIAN if numpy.shape(numpy.atleast_1d(npoints)) != (ndims,): - raise AtError('npoints shape should be (len(planes),)') + raise AtError("npoints shape should be (len(planes),)") if numpy.shape(numpy.atleast_1d(amplitudes)) != (ndims,): - raise AtError('amplitudes shape should be (len(planes),)') - if (numpy.shape(numpy.atleast_2d(bounds)) != (ndims, 2) - and bounds is not None): - raise AtError('bounds shape should be (len(planes),2)') + raise AtError("amplitudes shape should be (len(planes),)") + if numpy.shape(numpy.atleast_2d(bounds)) != (ndims, 2) and bounds is not None: + raise AtError("bounds shape should be (len(planes),2)") if grid_mode is GridMode.RADIAL or grid_mode is GridMode.RECURSIVE: if bounds is None: @@ -114,12 +113,11 @@ def grid_configuration(planes, npoints, amplitudes, grid_mode, bounds=None, bounds[0][bounds[0] == 0] = 1.0e-6 elif grid_mode is GridMode.CARTESIAN: if bounds is None: - bounds = numpy.array([[p-1, 1] for p in range(ndims)]) + bounds = numpy.array([[p - 1, 1] for p in range(ndims)]) else: - raise AtError('GridMode {0} undefined.'.format(grid_mode)) + raise AtError("GridMode {0} undefined.".format(grid_mode)) - config = grid_config(planes, amplitudes, npoints, - bounds, grid_mode, shift_zero) + config = grid_config(planes, amplitudes, npoints, bounds, grid_mode, shift_zero) return config @@ -129,8 +127,9 @@ def get_parts(config, offset): and initial offset, returns a grid object containing the grid and the offset from which the array can be reconstructed """ + def get_part_grid_uniform(bnd, np, amp): - x = [numpy.linspace(*b, n)*a for a, b, n in zip(amp, bnd, np)] + x = [numpy.linspace(*b, n) * a for a, b, n in zip(amp, bnd, np)] try: g1, g2 = numpy.meshgrid(*x) except ValueError: @@ -140,8 +139,8 @@ def get_part_grid_uniform(bnd, np, amp): def get_part_grid_radial(bnd, np, amp): x = [numpy.linspace(*b, n) for b, n in zip(bnd, np)] g1, g2 = numpy.meshgrid(*x) - g1r = amp[0]*numpy.cos(g2)*g1 - g2r = amp[1]*numpy.sin(g2)*g1 + g1r = amp[0] * numpy.cos(g2) * g1 + g2r = amp[1] * numpy.sin(g2) * g1 return numpy.array([g1r.flatten(), g2r.flatten()]) pind = config.planesi @@ -157,7 +156,7 @@ def get_part_grid_radial(bnd, np, amp): parts = numpy.zeros((6, numpy.prod(np))) parts[pind, :] = [g[i] for i in range(len(pind))] offset = numpy.array(offset) + config.shift_zero - parts = (parts.T+offset).T + parts = (parts.T + offset).T return parts, grid(g, offset[pind]) @@ -165,14 +164,23 @@ def get_survived(parts, ring, nturns, use_mp, **kwargs): """ Track a grid through the ring and extract survived particles """ - _, _, td = ring.track(parts, nturns=nturns, losses=True, use_mp=use_mp, **kwargs) - return numpy.invert(td['loss_map'].islost) + _, _, td = ring.track( + parts, + nturns=nturns, + losses=True, + use_mp=use_mp, + refpts=None, + in_place=True, + **kwargs, + ) + return numpy.invert(td["loss_map"].islost) def get_grid_boundary(mask, grid, config): """ Compute the boundary of survided particles array """ + def nearest_order(grid): # keep only max r for each angle angle = numpy.arctan2(*grid) @@ -196,13 +204,16 @@ def nearest_order(grid): for i in range(1, len(x)): xnow = x[iorder[-1]] ynow = y[iorder[-1]] - dd = numpy.sqrt(((x-xnow)/dxmin)**2+((y-ynow)/dymin)**2) + dd = numpy.sqrt(((x - xnow) / dxmin) ** 2 + ((y - ynow) / dymin) ** 2) if i <= 3: ic = [j for j in numpy.argsort(dd) if j not in iorder] else: - direction = numpy.sign(iorder[-1]-iorder[-2]) - ic = [j for j in numpy.argsort(dd) if j not in iorder - and numpy.sign(j-iorder[-1]) == direction] + direction = numpy.sign(iorder[-1] - iorder[-2]) + ic = [ + j + for j in numpy.argsort(dd) + if j not in iorder and numpy.sign(j - iorder[-1]) == direction + ] if len(ic) > 0: iorder.append(ic[0]) # finally connect both ends if distance within unit square @@ -210,7 +221,7 @@ def nearest_order(grid): ynow = y[iorder[-1]] xs = x[iorder[0]] ys = y[iorder[0]] - dd = numpy.sqrt(((xs-xnow)/dxmin)**2+((ys-ynow)/dymin)**2) + dd = numpy.sqrt(((xs - xnow) / dxmin) ** 2 + ((ys - ynow) / dymin) ** 2) if dd < 1.5: iorder.append(iorder[0]) gf = gf[:, iorder] @@ -221,7 +232,7 @@ def search_bnd(ma, sa): for j, m in enumerate(ma): bnd = sa[:, j] if not m and j > 0: - bnd = sa[:, j-1] + bnd = sa[:, j - 1] break return bnd @@ -246,8 +257,7 @@ def vector_boundary(mask, grid): g = grid.grid xp, xn = g[0] >= 0, g[0] <= 0 bp = search_bnd(mask[xp], g[:, xp]) - bn = search_bnd(numpy.flip(mask[xn]), - numpy.flip(g[:, xn], axis=1)) + bn = search_bnd(numpy.flip(mask[xn]), numpy.flip(g[:, xn], axis=1)) return numpy.squeeze([bn[0], bp[0]]) def radial_boundary(mask, grid): @@ -261,71 +271,135 @@ def radial_boundary(mask, grid): return bnd if not numpy.any(mask): - msg = ("No particle survived, please check your grid " - "or lattice. Acceptance set to [0.0, 0.0].") + msg = ( + "No particle survived, please check your grid " + "or lattice. Acceptance set to [0.0, 0.0]." + ) warnings.warn(AtWarning(msg)) cnt = numpy.flip(config.shape) if len(cnt) == 1: return numpy.zeros(2) else: - return numpy.zeros((2, 1)) + return numpy.zeros((2, 1)) if config.mode is GridMode.RADIAL: return radial_boundary(mask, grid) elif config.mode is GridMode.CARTESIAN: return grid_boundary(mask, grid, config) else: - raise AtError('GridMode {0} undefined.'.format(grid.mode)) - - -def grid_boundary_search(ring, planes, npoints, amplitudes, nturns=1024, - obspt=None, dp=None, offset=None, bounds=None, - grid_mode=GridMode.RADIAL, use_mp=False, - verbose=True, shift_zero=1.0e-9, **kwargs): + raise AtError("GridMode {0} undefined.".format(grid.mode)) + + +def grid_boundary_search( + ring, + planes, + npoints, + amplitudes, + nturns=1024, + obspt=None, + dp=None, + offset=None, + bounds=None, + grid_mode=GridMode.RADIAL, + use_mp=False, + verbose=True, + shift_zero=1.0e-9, + **kwargs, +): """ Search for the boundary by tracking a grid """ - offset, newring = set_ring_orbit(ring, dp, obspt, - offset) - config = grid_configuration(planes, npoints, amplitudes, - grid_mode, bounds=bounds, - shift_zero=shift_zero) + config = grid_configuration( + planes, npoints, amplitudes, grid_mode, bounds=bounds, shift_zero=shift_zero + ) if verbose: - print('\nRunning grid boundary search:') - if obspt is None: - print('Element {0}, obspt={1}'.format(ring[0].FamName, 0)) + print("\nRunning grid boundary search:") + if len(obspt) == 1: + if obspt[0] is None: + print("Element {0}, obspt={1}".format(ring[0].FamName, 0)) + else: + print("Element {0}, obspt={1}".format(ring[obspt].FamName, obspt)) else: - print('Element {0}, obspt={1}'.format(ring[obspt].FamName, - obspt)) - print('The grid mode is {0}'.format(config.mode)) - print('The planes are {0}'.format(config.planes)) - print('Number of steps are {0}'.format(config.shape)) - print('The maximum amplitudes are {0}'.format(config.amplitudes)) - print('The maximum boundaries are {0}'.format(config.bounds)) - print('The initial offset is {0} with dp={1}'.format(offset, dp)) + print( + "{0} Elements from {1}, obspt={2} to {3}, obspt={4}".format( + len(obspt), + ring[obspt[0]].FamName, + obspt[0], + ring[obspt[-1]].FamName, + obspt[-1], + ) + ) + print("The grid mode is {0}".format(config.mode)) + print("The planes are {0}".format(config.planes)) + print("Number of steps are {0}".format(config.shape)) + print("The maximum amplitudes are {0}".format(config.amplitudes)) + print("The maximum boundaries are {0}".format(config.bounds)) t0 = time.time() - parts, grid = get_parts(config, offset) - mask = get_survived(parts, newring, nturns, use_mp, **kwargs) - survived = grid.grid[:, mask] - boundary = get_grid_boundary(mask, grid, config) + allparts = [] + grids = [] + offsets = [] + + for i, obs, orbit in zip(numpy.arange(len(obspt)), obspt, offset): + orbit, newring = set_ring_orbit(ring, dp, obs, orbit) + parts, grid = get_parts(config, orbit) + obs = 0 if obs is None else obs + dpp = 0.0 if dp is None else dp + if verbose: + print( + "\r{4}/{5}: Projecting obs=({0}, {1}) to the start of the ring, " + "the initial offset is {2} with dp={3}".format( + ring[obs].FamName, obs, orbit, dpp, i + 1, len(obspt) + ) + ) + newring[: len(ring) - obs].track( + parts, use_mp=use_mp, in_place=True, refpts=None, **kwargs + ) + allparts.append(parts) + grids.append(grid) + offsets.append(orbit) if verbose: - print('Calculation took {0}'.format(time.time()-t0)) - return boundary, survived, grid.grid - - -def recursive_boundary_search(ring, planes, npoints, amplitudes, nturns=1024, - obspt=None, dp=None, offset=None, bounds=None, - use_mp=False, divider=2, verbose=True, - shift_zero=1.0e-9, **kwargs): + print("Starting the multi-turn tracking...") + allparts = numpy.concatenate(allparts, axis=1) + mask = get_survived(allparts, ring, nturns, use_mp, **kwargs) + mask = numpy.split(mask, len(grids)) + survived = [g.grid[:, m] for g, m in zip(grids, mask)] + boundary = [get_grid_boundary(m, g, config) for g, m in zip(grids, mask)] + grids = [g.grid for g in grids] + if verbose: + print("Calculation took {0}".format(time.time() - t0)) + if len(obspt) == 1: + return boundary[0], survived[0], grids[0] + else: + return boundary, survived, grids + + +def recursive_boundary_search( + ring, + planes, + npoints, + amplitudes, + nturns=1024, + obspt=None, + dp=None, + offset=None, + bounds=None, + use_mp=False, + divider=2, + verbose=True, + shift_zero=1.0e-9, + **kwargs, +): """ Recursively search for the boundary in a given plane and direction (angle) """ - def search_boundary(planesi, angles, rtol, rsteps, nturns, - offset, use_mp, **kwargs): - ftol = min(rtol/rsteps) + def search_boundary( + planesi, angles, rtol, rsteps, nturns, offset, use_mp, **kwargs + ): + + ftol = min(rtol / rsteps) cs = numpy.squeeze([numpy.cos(angles), numpy.sin(angles)]) cs = numpy.around(cs, decimals=9) fact = numpy.ones(len(angles)) @@ -336,46 +410,52 @@ def search_boundary(planesi, angles, rtol, rsteps, nturns, while numpy.any(survived): for i, pi in enumerate(planesi): - part[pi, survived] += cs[i, survived]*rsteps[i]*fact[survived] - istracked = numpy.array([not numpy.any([numpy.allclose(p, g, - rtol=1.0e-9) - for g in grid.T]) - for p in part[planesi].T]) - survived = numpy.array([numpy.any([numpy.allclose(p, m, - rtol=1.0e-9) - for m in mask.T]) - for p in part[planesi].T]) + part[pi, survived] += cs[i, survived] * rsteps[i] * fact[survived] + istracked = numpy.array( + [ + not numpy.any([numpy.allclose(p, g, rtol=1.0e-9) for g in grid.T]) + for p in part[planesi].T + ] + ) + survived = numpy.array( + [ + numpy.any([numpy.allclose(p, m, rtol=1.0e-9) for m in mask.T]) + for p in part[planesi].T + ] + ) pt = part[:, istracked] - grid = numpy.hstack([grid, - pt[planesi]]) if grid.size else pt[planesi] + grid = numpy.hstack([grid, pt[planesi]]) if grid.size else pt[planesi] ptmp = (pt.T + offset).T - survived[istracked] = get_survived(ptmp, newring, nturns, - use_mp, **kwargs) + survived[istracked] = get_survived(ptmp, newring, nturns, use_mp, **kwargs) pm = part[:, numpy.logical_and(istracked, survived)] - mask = numpy.hstack([mask, - pm[planesi]]) if mask.size else pm[planesi] + mask = numpy.hstack([mask, pm[planesi]]) if mask.size else pm[planesi] for i in range(len(angles)): if not survived[i] and fact[i] > ftol: - deltas = cs[:, i]*rsteps[:]*min(1, 2*fact[i]) + deltas = cs[:, i] * rsteps[:] * min(1, 2 * fact[i]) if numpy.any(abs(deltas) > abs(part[planesi, i])): part[planesi, i] = numpy.zeros(len(planesi)) else: for j, pi in enumerate(planesi): part[pi, i] -= deltas[j] survived[i] = True - fact[i] *= 1/divider + fact[i] *= 1 / divider for i, pi in enumerate(planesi): - part[pi] -= cs[i]*rsteps[i]*fact + part[pi] -= cs[i] * rsteps[i] * fact p = numpy.squeeze(part[planesi]) return p, mask, grid offset, newring = set_ring_orbit(ring, dp, obspt, offset) - config = grid_configuration(planes, npoints, amplitudes, - GridMode.RECURSIVE, bounds=bounds, - shift_zero=shift_zero) - rtol = min(numpy.atleast_1d(config.amplitudes/config.shape)) + config = grid_configuration( + planes, + npoints, + amplitudes, + GridMode.RECURSIVE, + bounds=bounds, + shift_zero=shift_zero, + ) + rtol = min(numpy.atleast_1d(config.amplitudes / config.shape)) rstep = config.amplitudes if len(numpy.atleast_1d(config.shape)) == 2: angles = numpy.linspace(*config.bounds[1], config.shape[1]) @@ -384,54 +464,109 @@ def search_boundary(planesi, angles, rtol, rsteps, nturns, angles = numpy.atleast_1d(angles) if verbose: - print('\nRunning recursive boundary search:') + print("\nRunning recursive boundary search:") if obspt is None: - print('Element {0}, obspt={1}'.format(ring[0].FamName, 0)) + print("Element {0}, obspt={1}".format(ring[0].FamName, 0)) else: - print('Element {0}, obspt={1}'.format(ring[obspt].FamName, - obspt)) - print('The grid mode is {0}'.format(config.mode)) - print('The planes are {0}'.format(config.planes)) - print('Number of angles is {0} from {1} to {2} rad'.format(len(angles), - angles[0], angles[-1])) - print('The resolution of the search is {0}'.format(rtol)) - print('The initial step size is {0}'.format(rstep)) - print('The initial offset is {0} with dp={1}'.format(offset, dp)) + print("Element {0}, obspt={1}".format(ring[obspt].FamName, obspt)) + print("The grid mode is {0}".format(config.mode)) + print("The planes are {0}".format(config.planes)) + print( + "Number of angles is {0} from {1} to {2} rad".format( + len(angles), angles[0], angles[-1] + ) + ) + print("The resolution of the search is {0}".format(rtol)) + print("The initial step size is {0}".format(rstep)) + print("The initial offset is {0} with dp={1}".format(offset, dp)) t0 = time.time() - result = search_boundary(config.planesi, angles, rtol, rstep, - nturns, offset, use_mp, **kwargs) + result = search_boundary( + config.planesi, angles, rtol, rstep, nturns, offset, use_mp, **kwargs + ) if verbose: - print('Calculation took {0}'.format(time.time()-t0)) + print("Calculation took {0}".format(time.time() - t0)) return result -def boundary_search(ring: Lattice, planes, npoints, amplitudes, - nturns: Optional[int] = 1024, - obspt: Optional[int] = None, dp: Optional[float] = None, - offset: Sequence[float] = None, bounds=None, - grid_mode: Optional[GridMode] = GridMode.RADIAL, - use_mp: Optional[bool] = False, - verbose: Optional[bool] = True, - shift_zero: Optional[float] = 1.0e-9, - **kwargs): +def boundary_search( + ring: Lattice, + planes, + npoints, + amplitudes, + nturns: Optional[int] = 1024, + obspt: Optional[Refpts] = None, + dp: Optional[float] = None, + offset: Sequence[float] = None, + bounds=None, + grid_mode: Optional[GridMode] = GridMode.RADIAL, + use_mp: Optional[bool] = False, + verbose: Optional[bool] = True, + shift_zero: Optional[float] = 1.0e-9, + **kwargs, +): """ Computes the loss boundary at a single point in the machine """ - divider = kwargs.pop('divider', 2) + if obspt is not None: + rp = ring.uint32_refpts(obspt) + else: + rp = numpy.atleast_1d(obspt) + if offset is not None: + try: + offset = numpy.broadcast_to(offset, (len(rp), 6)) + except ValueError: + msg = "offset and refpts have incoherent " "shapes: {0}, {1}".format( + numpy.shape(offset), numpy.shape(obspt) + ) + raise AtError(msg) + else: + offset = [None for _ in rp] + + divider = kwargs.pop("divider", 2) if grid_mode is GridMode.RECURSIVE: - result = recursive_boundary_search(ring, planes, npoints, amplitudes, - nturns=nturns, obspt=obspt, dp=dp, - offset=offset, bounds=bounds, - use_mp=use_mp, verbose=verbose, - divider=divider, - shift_zero=shift_zero, - **kwargs) + boundary = [] + survived = [] + grid = [] + for r, o in zip(rp, offset): + b, s, g = recursive_boundary_search( + ring, + planes, + npoints, + amplitudes, + nturns=nturns, + obspt=r, + dp=dp, + offset=o, + bounds=bounds, + use_mp=use_mp, + verbose=verbose, + divider=divider, + shift_zero=shift_zero, + **kwargs, + ) + boundary.append(b) + survived.append(s) + grid.append(g) + if len(rp) == 1: + result = (boundary[0], survived[0], grid[0]) + else: + result = (boundary, survived, grid) else: - result = grid_boundary_search(ring, planes, npoints, amplitudes, - nturns=nturns, obspt=obspt, dp=dp, - offset=offset, bounds=bounds, - grid_mode=grid_mode, use_mp=use_mp, - verbose=verbose, shift_zero=shift_zero, - **kwargs) + result = grid_boundary_search( + ring, + planes, + npoints, + amplitudes, + nturns=nturns, + obspt=rp, + dp=dp, + offset=offset, + bounds=bounds, + grid_mode=grid_mode, + use_mp=use_mp, + verbose=verbose, + shift_zero=shift_zero, + **kwargs, + ) return result diff --git a/pyat/at/acceptance/touschek.py b/pyat/at/acceptance/touschek.py index d31a7c6740..58cd9d56fd 100644 --- a/pyat/at/acceptance/touschek.py +++ b/pyat/at/acceptance/touschek.py @@ -12,7 +12,7 @@ from ..constants import qe, clight, _e_radius -__all__ = ['get_bunch_length_espread', 'get_lifetime', 'get_scattering_rate'] +__all__ = ["get_bunch_length_espread", "get_lifetime", "get_scattering_rate"] def get_bunch_length_espread(ring, zn=None, bunch_curr=None, espread=None): @@ -34,8 +34,9 @@ def get_bunch_length_espread(ring, zn=None, bunch_curr=None, espread=None): Returns: Bunch length, energy spread """ + def haissinski(x, cst): - return x**3-x-cst + return x**3 - x - cst ep = at.envelope_parameters(ring.radiation_on(copy=True)) bl0 = ep.sigma_l @@ -44,28 +45,33 @@ def haissinski(x, cst): if zn is None and bunch_curr is None: bl = bl0 elif zn is None or bunch_curr is None: - raise AtError('Please provide both current and Z/n for bunch ' - 'length calculation') + raise AtError( + "Please provide both current and Z/n for bunch " "length calculation" + ) else: vrf = ring.get_rf_voltage() h = ring.harmonic_number etac = at.get_slip_factor(ring.radiation_off(copy=True)) cs = numpy.cos(ep.phi_s) - nus = ep.f_s/ring.revolution_frequency - cst = (-0.5 * numpy.sqrt(numpy.pi) * bunch_curr * zn / - (vrf * h * cs * (abs(etac) * espread / nus)**3)) - bl = bl0*fsolve(haissinski, numpy.array([1.0]), args=cst)[0] + nus = ep.f_s / ring.revolution_frequency + cst = ( + -0.5 + * numpy.sqrt(numpy.pi) + * bunch_curr + * zn + / (vrf * h * cs * (abs(etac) * espread / nus) ** 3) + ) + bl = bl0 * fsolve(haissinski, numpy.array([1.0]), args=cst)[0] return bl, espread -def get_beam_sizes(ring, bunch_curr, zn=None, emitx=None, - sigs=None, sigp=None): +def get_beam_sizes(ring, bunch_curr, zn=None, emitx=None, sigs=None, sigp=None): if zn is None: bunch_curr = None if sigs is None or sigp is None: - sigsi, sigpi = get_bunch_length_espread(ring, zn=zn, - bunch_curr=bunch_curr, - espread=sigp) + sigsi, sigpi = get_bunch_length_espread( + ring, zn=zn, bunch_curr=bunch_curr, espread=sigp + ) if sigs is None: sigs = sigsi if sigp is None: @@ -84,77 +90,127 @@ def int_piwinski(k, km, B1, B2): is substituted by its exponential approximation: :math:`I_0(x)~\frac{\exp(x)}{\sqrt{2 \pi x}}` """ - t = numpy.tan(k)**2 - tm = numpy.tan(km)**2 - fact = ((2*t+1)**2*(t/tm/(1+t)-1)/t + t - numpy.sqrt(t*tm*(1+t)) - - (2+1/(2*t))*numpy.log(t/tm/(1+t))) - if B2*t < 500: - intp = fact * numpy.exp(-B1*t)*iv(0, B2*t)*numpy.sqrt(1+t) + t = numpy.tan(k) ** 2 + tm = numpy.tan(km) ** 2 + fact = ( + (2 * t + 1) ** 2 * (t / tm / (1 + t) - 1) / t + + t + - numpy.sqrt(t * tm * (1 + t)) + - (2 + 1 / (2 * t)) * numpy.log(t / tm / (1 + t)) + ) + if B2 * t < 500: + intp = fact * numpy.exp(-B1 * t) * iv(0, B2 * t) * numpy.sqrt(1 + t) else: - intp = fact * numpy.exp(B2*t-B1*t) / \ - numpy.sqrt(2*numpy.pi*B2*t)*numpy.sqrt(1+t) + intp = ( + fact + * numpy.exp(B2 * t - B1 * t) + / numpy.sqrt(2 * numpy.pi * B2 * t) + * numpy.sqrt(1 + t) + ) return intp -def _get_vals(ring, rp, ma, emity, bunch_curr, emitx=None, - sigs=None, sigp=None, zn=None, epsabs=1.0e-16, - epsrel=1.0e-12): - - emitx, sigs, sigp = get_beam_sizes(ring, bunch_curr, zn=zn, - emitx=emitx, sigs=sigs, - sigp=sigp) - - nc = bunch_curr/ring.revolution_frequency/qe - beta2 = ring.beta*ring.beta - gamma2 = ring.gamma*ring.gamma +def _get_vals( + ring, + rp, + ma, + emity, + bunch_curr, + emitx=None, + sigs=None, + sigp=None, + zn=None, + epsabs=1.0e-16, + epsrel=1.0e-12, +): + + emitx, sigs, sigp = get_beam_sizes( + ring, bunch_curr, zn=zn, emitx=emitx, sigs=sigs, sigp=sigp + ) + + nc = bunch_curr / ring.revolution_frequency / qe + beta2 = ring.beta * ring.beta + gamma2 = ring.gamma * ring.gamma emit = numpy.array([emitx, emity]) _, _, ld = ring.get_optics(refpts=rp) bxy = ld.beta - bxy2 = bxy*bxy + bxy2 = bxy * bxy axy = ld.alpha dxy = ld.dispersion[:, [0, 2]] - dxy2 = dxy*dxy + dxy2 = dxy * dxy dpxy = ld.dispersion[:, [1, 3]] - sigb = numpy.sqrt(bxy*emit) - sigb2 = sigb*sigb - sigp2 = sigp*sigp - sig = numpy.sqrt(sigb2+sigp2*dxy2) - sig2 = sig*sig - dt = dxy*axy+dpxy*bxy - dt2 = dt*dt - sigh2 = 1/(1/sigp2 + numpy.sum((dxy2+dt2)/sigb2, axis=1)) - - bs = bxy2/sigb2*(1-(sigh2*(dt2/sigb2).T).T) - bg2i = 1/(2*beta2*gamma2) - B1 = bg2i*numpy.sum(bs, axis=1) - B2sq = bg2i*bg2i*(numpy.diff(bs, axis=1).T**2 + - 4*sigh2*sigh2*numpy.prod(bxy2*dt2, axis=1) / - numpy.prod(sigb2*sigb2, axis=1)) + sigb = numpy.sqrt(bxy * emit) + sigb2 = sigb * sigb + sigp2 = sigp * sigp + sig = numpy.sqrt(sigb2 + sigp2 * dxy2) + sig2 = sig * sig + dt = dxy * axy + dpxy * bxy + dt2 = dt * dt + sigh2 = 1 / (1 / sigp2 + numpy.sum((dxy2 + dt2) / sigb2, axis=1)) + + bs = bxy2 / sigb2 * (1 - (sigh2 * (dt2 / sigb2).T).T) + bg2i = 1 / (2 * beta2 * gamma2) + B1 = bg2i * numpy.sum(bs, axis=1) + B2sq = ( + bg2i + * bg2i + * ( + numpy.diff(bs, axis=1).T ** 2 + + 4 + * sigh2 + * sigh2 + * numpy.prod(bxy2 * dt2, axis=1) + / numpy.prod(sigb2 * sigb2, axis=1) + ) + ) B2 = numpy.squeeze(numpy.sqrt(B2sq)) val = numpy.zeros((2, len(rp))) for i in range(2): dpp = ma[:, i] - um = beta2*dpp*dpp + um = beta2 * dpp * dpp km = numpy.arctan(numpy.sqrt(um)) for ii in range(len(rp)): args = (km[ii], B1[ii], B2[ii]) - val[i, ii], *_ = integrate.quad(int_piwinski, args[0], numpy.pi/2, - args=args, epsabs=epsabs, - epsrel=epsrel) - - val[i] *= (_e_radius**2*clight*nc / - (8*numpy.pi*gamma2*sigs * - numpy.sqrt(numpy.prod(sig2, axis=1) - - sigp2*sigp2*numpy.prod(dxy2, axis=1))) * - 2*numpy.sqrt(numpy.pi*(B1*B1-B2*B2))) + val[i, ii], *_ = integrate.quad( + int_piwinski, + args[0], + numpy.pi / 2, + args=args, + epsabs=epsabs, + epsrel=epsrel, + ) + + val[i] *= ( + _e_radius**2 + * clight + * nc + / ( + 8 + * numpy.pi + * gamma2 + * sigs + * numpy.sqrt( + numpy.prod(sig2, axis=1) - sigp2 * sigp2 * numpy.prod(dxy2, axis=1) + ) + ) + * 2 + * numpy.sqrt(numpy.pi * (B1 * B1 - B2 * B2)) + ) return val -def _init_ma_rp(ring, refpts=None, offset=None, momap=None, - interpolate=True, check_zero=True, **kwargs): +def _init_ma_rp( + ring, + refpts=None, + offset=None, + momap=None, + interpolate=True, + check_zero=True, + **kwargs, +): if refpts is None: refpts = ring.get_uint32_index(at.All, endpoint=False) else: @@ -164,13 +220,15 @@ def _init_ma_rp(ring, refpts=None, offset=None, momap=None, try: offset = numpy.broadcast_to(offset, (len(refpts), 6)) except ValueError: - msg = ('offset and refpts have incoherent ' - 'shapes: {0}, {1}'.format(offset.shape, refpts.shape)) + msg = "offset and refpts have incoherent " "shapes: {0}, {1}".format( + offset.shape, refpts.shape + ) raise AtError(msg) if momap is not None: - assert len(momap) == len(refpts), \ - 'Input momap and refpts have different lengths' + assert len(momap) == len( + refpts + ), "Input momap and refpts have different lengths" if check_zero: mask = [ring[r].Length > 0.0 for r in refpts] @@ -178,8 +236,7 @@ def _init_ma_rp(ring, refpts=None, offset=None, momap=None, mask = [True for _ in refpts] if not numpy.all(mask): - zerolength_warning = ('zero-length elements removed ' - 'from lifetime calculation') + zerolength_warning = "zero-length elements removed " "from lifetime calculation" warnings.warn(AtWarning(zerolength_warning)) refpts = refpts[mask] @@ -187,19 +244,18 @@ def _init_ma_rp(ring, refpts=None, offset=None, momap=None, offset = offset[mask] if momap is None: - resolution = kwargs.pop('resolution', 1.0e-3) - amplitude = kwargs.pop('amplitude', 0.1) - kwargs.update({'refpts': refpts}) - kwargs.update({'offset': offset}) - momap, _, _ = ring.get_momentum_acceptance(resolution, - amplitude, **kwargs) + resolution = kwargs.pop("resolution", 1.0e-3) + amplitude = kwargs.pop("amplitude", 0.1) + kwargs.update({"refpts": refpts}) + kwargs.update({"offset": offset}) + momap, _, _ = ring.get_momentum_acceptance(resolution, amplitude, **kwargs) else: momap = momap[mask] if interpolate: - refpts_all = numpy.array([i for i in range(refpts[0], - refpts[-1]+1) - if ring[i].Length > 0]) + refpts_all = numpy.array( + [i for i in range(refpts[0], refpts[-1] + 1) if ring[i].Length > 0] + ) spos = numpy.squeeze(ring.get_s_pos(refpts)) spos_all = numpy.squeeze(ring.get_s_pos(refpts_all)) momp = numpy.interp(spos_all, spos, momap[:, 0]) @@ -211,8 +267,19 @@ def _init_ma_rp(ring, refpts=None, offset=None, momap=None, return ma, rp -def get_lifetime(ring, emity, bunch_curr, emitx=None, sigs=None, sigp=None, - zn=None, momap=None, refpts=None, offset=None, **kwargs): +def get_lifetime( + ring, + emity, + bunch_curr, + emitx=None, + sigs=None, + sigp=None, + zn=None, + momap=None, + refpts=None, + offset=None, + **kwargs, +): """Touschek lifetime calculation Computes the touschek lifetime using the Piwinski formula @@ -254,24 +321,60 @@ def get_lifetime(ring, emity, bunch_curr, emitx=None, sigs=None, sigp=None, ma: momentum aperture (len(refpts), 2) array refpts: refpts used for momentum aperture calculation (len(refpts), ) array + + .. note:: + + * When``use_mp=True`` all the available CPUs will be used. + This behavior can be changed by setting + ``at.DConstant.patpass_poolsize`` to the desired value + * When multiple ``refpts`` are provided particles are first + projected to the beginning of the ring with tracking. Then, + all particles are tracked up to ``nturns``. This allows to + do most of the work in a single function call and allows for + full parallelization. """ - interpolate = kwargs.pop('interpolate', True) - epsabs = kwargs.pop('epsabs', 1.0e-16) - epsrel = kwargs.pop('epsrel', 1.0e-12) - ma, rp = _init_ma_rp(ring, refpts=refpts, offset=offset, - momap=momap, interpolate=interpolate, - **kwargs) + interpolate = kwargs.pop("interpolate", True) + epsabs = kwargs.pop("epsabs", 1.0e-16) + epsrel = kwargs.pop("epsrel", 1.0e-12) + ma, rp = _init_ma_rp( + ring, + refpts=refpts, + offset=offset, + momap=momap, + interpolate=interpolate, + **kwargs, + ) length_all = numpy.array([e.Length for e in ring[rp]]) - vals = _get_vals(ring, rp, ma, emity, bunch_curr, emitx=emitx, - sigs=sigs, sigp=sigp, zn=zn, epsabs=epsabs, - epsrel=epsrel) - tl = 1/numpy.mean([sum(v*length_all.T)/sum(length_all) for v in vals]) + vals = _get_vals( + ring, + rp, + ma, + emity, + bunch_curr, + emitx=emitx, + sigs=sigs, + sigp=sigp, + zn=zn, + epsabs=epsabs, + epsrel=epsrel, + ) + tl = 1 / numpy.mean([sum(v * length_all.T) / sum(length_all) for v in vals]) return tl, ma, rp -def get_scattering_rate(ring, emity, bunch_curr, emitx=None, sigs=None, - sigp=None, zn=None, momap=None, refpts=None, - offset=None, **kwargs): +def get_scattering_rate( + ring, + emity, + bunch_curr, + emitx=None, + sigs=None, + sigp=None, + zn=None, + momap=None, + refpts=None, + offset=None, + **kwargs, +): """Touschek scattering rate calculation Computes the touschek scattering using the Piwinski formula @@ -314,17 +417,34 @@ def get_scattering_rate(ring, emity, bunch_curr, emitx=None, sigs=None, refpts: refpts used for momentum aperture calculation (len(refpts), ) array """ - interpolate = kwargs.pop('interpolate', True) - epsabs = kwargs.pop('epsabs', 1.0e-16) - epsrel = kwargs.pop('epsrel', 1.0e-12) - ma, rp = _init_ma_rp(ring, refpts=refpts, momap=momap, - interpolate=interpolate, check_zero=False, - offset=offset, **kwargs) - vals = _get_vals(ring, rp, ma, emity, bunch_curr, emitx=emitx, - sigs=sigs, sigp=sigp, zn=zn, epsabs=epsabs, - epsrel=epsrel) - scattering_rate = (numpy.mean(vals, axis=0)*bunch_curr / - ring.revolution_frequency / qe) + interpolate = kwargs.pop("interpolate", True) + epsabs = kwargs.pop("epsabs", 1.0e-16) + epsrel = kwargs.pop("epsrel", 1.0e-12) + ma, rp = _init_ma_rp( + ring, + refpts=refpts, + momap=momap, + interpolate=interpolate, + check_zero=False, + offset=offset, + **kwargs, + ) + vals = _get_vals( + ring, + rp, + ma, + emity, + bunch_curr, + emitx=emitx, + sigs=sigs, + sigp=sigp, + zn=zn, + epsabs=epsabs, + epsrel=epsrel, + ) + scattering_rate = ( + numpy.mean(vals, axis=0) * bunch_curr / ring.revolution_frequency / qe + ) return scattering_rate, ma, rp