-
Notifications
You must be signed in to change notification settings - Fork 88
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update internals to vectorised SciPy #94
Changes from 20 commits
577f114
3b4cae0
abfd0a3
2ccc6fe
c8af6fc
97ee390
4817ad7
bdc9a56
bb6ba66
ec7b524
1e61f32
211902e
796df33
928c5f9
3f9fd2a
39bbe74
e572c1d
9d7eb5d
4b221e0
77c27f3
52b0402
5ef60ba
8e2b13d
4fe20b9
72c4e51
d292f60
ee723fc
056636b
56dab9b
e150bb1
ac72711
e7f84b1
a13841d
5e95962
b104425
1cf0205
1f92146
2e549fc
95a5710
255d40f
841310b
ac7e533
61321f7
84d3b53
764f56b
fef99e8
f3fd575
29e72ae
2c9d2a4
b5a8310
599c16e
4865c28
89282ed
916627e
79d0b0b
576074f
9aea6d1
0225f0d
5b9deeb
7539916
6991f94
93f5389
f793e19
957ae8f
c7dd9a9
159d72f
3a4be06
62aa96e
8ac40db
4becb2b
639b7ac
7f65f2b
73222f4
7265b47
0288af8
5cfda19
b266d6a
6c02c36
3c86b26
81c64ab
ab1a73b
269412a
45c9e51
7af48e3
509d332
e85fa65
f0b16fd
fb72057
4a0c615
2c8480b
af4377d
c2ae7e3
067b514
92c31f6
a2dbf5d
819687d
bd32dea
768e760
d8f7077
676b215
04cbd55
6a822d5
1f67aec
bb5d179
be2b2e8
f9ec900
73d9919
5e1197f
d90469b
cf57e09
f022de2
f32642b
df74bf3
f6ab25d
f6eeeda
c5e1979
c66b88e
439b1b8
ce3146b
15185a1
b7fe35e
748e882
a6a8271
819f634
6878ede
80ef087
04dbbc9
91e8775
5318a31
9bcdcce
c5e2a12
a86c3e0
7c46ef7
1492c09
8bddade
e188aed
17f3bc4
6fc913e
ba88ab1
c4bb36a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,7 @@ | |
from obspy import Trace, Catalog, UTCDateTime, Stream, read, read_events | ||
from obspy.core.event import Event, Pick, CreationInfo, ResourceIdentifier | ||
from obspy.core.event import Comment, WaveformStreamID | ||
from scipy.fftpack import next_fast_len | ||
|
||
from eqcorrscan.utils.timer import Timer | ||
from eqcorrscan.utils.findpeaks import find_peaks2_short, decluster | ||
|
@@ -51,6 +52,31 @@ | |
from eqcorrscan.core.lag_calc import lag_calc | ||
|
||
|
||
def _spike_test(stream, percent=0.99, multiplier=1e6): | ||
""" | ||
Check for very large spikes in data and raise an error if found. | ||
|
||
:param stream: Stream to look for spikes in. | ||
:type stream: :class:`obspy.core.stream.Stream` | ||
:param percent: Percentage as a decimal to calcualte range for. | ||
:type percent: float | ||
:param multiplier: Multiplier of range to define a spike. | ||
:type multiplier: float | ||
""" | ||
for tr in stream: | ||
if (tr.data > 2 * np.max( | ||
np.sort(np.abs( | ||
tr))[0:int(percent * len(tr.data))]) * multiplier).sum() > 0: | ||
msg = ('Spikes above ' + str(multiplier) + | ||
' of the range of ' + str(percent) + | ||
' of the data present, check. \n ' + | ||
'This would otherwise likely result in an issue during ' + | ||
'FFT prior to cross-correlation.\n' + | ||
'If you think this spike is real please report ' + | ||
'this as a bug.') | ||
raise MatchFilterError(msg) | ||
|
||
|
||
class MatchFilterError(Exception): | ||
""" | ||
Default error for match-filter errors. | ||
|
@@ -3360,6 +3386,170 @@ def normxcorr2(template, image): | |
return ccc | ||
|
||
|
||
def multi_normxcorr(templates, stream, pads): | ||
""" | ||
Compute the normalized cross-correlation of multiple templates with data. | ||
:param templates: 2D Array of templates | ||
:type templates: np.ndarray | ||
:param stream: 1D array of continuous data | ||
:type stream: np.ndarray | ||
:param pads: List of ints of pad lengths in the same order as templates | ||
:type pads: list | ||
|
||
:return: np.ndarray | ||
""" | ||
# TODO:: Try other fft methods: pyfftw? | ||
import bottleneck | ||
from scipy.signal.signaltools import _centered | ||
from scipy.fftpack.helper import next_fast_len | ||
|
||
# Generate a template mask | ||
used_chans = ~np.isnan(templates).any(axis=1) | ||
# Currently have to use float64 as bottleneck runs into issues with other | ||
# types: https://github.com/kwgoodman/bottleneck/issues/164 | ||
stream = stream.astype(np.float64) | ||
templates = templates.astype(np.float64) | ||
template_length = templates.shape[1] | ||
stream_length = len(stream) | ||
fftshape = next_fast_len(template_length + stream_length - 1) | ||
# Set up normalizers | ||
stream_mean_array = bottleneck.move_mean( | ||
stream, template_length)[template_length - 1:] | ||
stream_std_array = bottleneck.move_std( | ||
stream, template_length)[template_length - 1:] | ||
# Normalize and flip the templates | ||
norm = ((templates - templates.mean(axis=-1, keepdims=True)) / ( | ||
templates.std(axis=-1, keepdims=True) * template_length)) | ||
norm_sum = norm.sum(axis=-1, keepdims=True) | ||
stream_fft = np.fft.rfft(stream, fftshape) | ||
template_fft = np.fft.rfft(np.flip(norm, axis=-1), fftshape, axis=-1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Numpy's flip was added in version 1.12.0 so we need to make sure and bump the version in the setup.py There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Obspy 1.0.3 does not play nice on Windows, but the current master does, I'm going to keep the appveyor running the osbpy master and travis running the current release until the next obspy release when appveyor should revert to the current obspy release. I think it comes down to their pinning of matplotlib...? |
||
res = np.fft.irfft(template_fft * stream_fft, | ||
fftshape)[:, 0:template_length + stream_length - 1] | ||
res = ((_centered(res, stream_length - template_length + 1)) - | ||
norm_sum * stream_mean_array) / stream_std_array | ||
for i in range(len(pads)): | ||
# This is a hack from padding templates with nan data | ||
if np.isnan(res[i]).all(): | ||
res[i] = np.zeros(len(res[i])) | ||
else: | ||
res[i] = np.append(res[i], np.zeros(pads[i]))[pads[i]:] | ||
return res.astype(np.float32), used_chans | ||
|
||
|
||
def multichannel_xcorr(templates, stream, use_dask=False, compute=True, | ||
cores=1): | ||
""" | ||
Cross-correlate multiple channels either in parallel or not | ||
|
||
:type templates: list | ||
:param templates: | ||
A list of templates, where each one should be an obspy.Stream object | ||
containing multiple traces of seismic data and the relevant header | ||
information. | ||
:type stream: obspy.core.stream.Stream | ||
:param stream: | ||
A single Stream object to be correlated with the templates. This is | ||
in effect the image in normxcorr2 and cv2. | ||
:type dask: bool | ||
:param dask: | ||
Whether to use dask for multiprocessing or not, if False, will use | ||
python native multiprocessing. | ||
:type compute: bool | ||
:param compute: | ||
Only valid if dask==True. If compute==False, returned result with be | ||
a dask.delayed object, useful if you are using dask to compute multiple | ||
time-steps at the same time. | ||
:type cores: int | ||
:param cores: | ||
Number of processed to use, if set to None, and dask==False, no | ||
multiprocessing will be done. | ||
:type cores: int | ||
:param cores: Number of cores to loop over | ||
|
||
:returns: | ||
New list of :class:`numpy.ndarray` objects. These will contain | ||
the correlation sums for each template for this day of data. | ||
:rtype: list | ||
:returns: | ||
list of ints as number of channels used for each cross-correlation. | ||
:rtype: list | ||
:returns: | ||
list of list of tuples of station, channel for all cross-correlations. | ||
:rtype: list | ||
|
||
.. Note:: | ||
Each template must contain the same channels as every other template, | ||
the stream must also contain the same channels (note that if there | ||
are duplicate channels in the template you do not need duplicate | ||
channels in the stream). | ||
""" | ||
no_chans = np.zeros(len(templates)) | ||
chans = [[] for _i in range(len(templates))] | ||
# Do some reshaping | ||
stream.sort(['network', 'station', 'location', 'channel']) | ||
t_starts = [] | ||
for template in templates: | ||
template.sort(['network', 'station', 'location', 'channel']) | ||
t_starts.append(min([tr.stats.starttime for tr in template])) | ||
seed_ids = [tr.id + '_' + str(i) for i, tr in enumerate(templates[0])] | ||
template_array = {} | ||
stream_array = {} | ||
pad_array = {} | ||
for i, seed_id in enumerate(seed_ids): | ||
t_ar = np.array([template[i].data for template in templates]) | ||
template_array.update({seed_id: t_ar}) | ||
stream_array.update( | ||
{seed_id: stream.select(id=seed_id.split('_')[0])[0].data}) | ||
pad_list = [ | ||
int(round(template[i].stats.sampling_rate * | ||
(template[i].stats.starttime - t_starts[j]))) | ||
for j, template in zip(range(len(templates)), templates)] | ||
pad_array.update({seed_id: pad_list}) | ||
# if use_dask: | ||
# import dask | ||
# xcorrs = [] | ||
# for seed_id in seed_ids: | ||
# tr_xcorrs, tr_chans = dask.delayed(multi_normxcorr)( | ||
# templates=template_array[seed_id], | ||
# stream=stream.select(id=seed_id.split('_')[0])[0].data) | ||
# xcorrs.append(tr_xcorrs) | ||
# cccsums = dask.delayed(np.sum)(xcorrs, axis=0) | ||
# if compute: | ||
# cccsums.compute() | ||
if cores is None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it make sense to abstract the multiprocessing further up? I know several functions use something similar so maybe we could make a generic pool interface on the module level, then we could have persistent processes/threads in the pool so we wouldn't need to spin them up every time. |
||
cccsums = np.zeros([len(templates), | ||
len(stream[0]) - len(templates[0][0]) + 1]) | ||
for seed_id in seed_ids: | ||
tr_xcorrs, tr_chans = multi_normxcorr( | ||
templates=template_array[seed_id], | ||
stream=stream_array[seed_id], pads=pad_array[seed_id]) | ||
cccsums = np.sum([cccsums, tr_xcorrs], axis=0) | ||
no_chans += tr_chans.astype(np.int) | ||
for chan, state in zip(chans, tr_chans): | ||
if state: | ||
chan.append((seed_id.split('.')[1], | ||
seed_id.split('.')[-1].split('_')[0])) | ||
else: | ||
pool = Pool(processes=cores) | ||
results = [pool.apply_async( | ||
multi_normxcorr, (template_array[seed_id], stream_array[seed_id], | ||
pad_array[seed_id])) | ||
for seed_id in seed_ids] | ||
pool.close() | ||
results = [p.get() for p in results] | ||
xcorrs = [p[0] for p in results] | ||
tr_chans = np.array([p[1] for p in results]) | ||
pool.join() | ||
cccsums = np.sum(xcorrs, axis=0) | ||
no_chans = np.sum(tr_chans.astype(np.int), axis=0) | ||
for seed_id, tr_chan in zip(seed_ids, tr_chans): | ||
for chan, state in zip(chans, tr_chan): | ||
if state: | ||
chan.append((seed_id.split('.')[1], | ||
seed_id.split('.')[-1].split('_')[0])) | ||
return cccsums, no_chans, chans | ||
|
||
|
||
def _template_loop(template, chan, stream_ind, debug=0, i=0): | ||
""" | ||
Handle individual template correlations. | ||
|
@@ -3421,7 +3611,7 @@ def _template_loop(template, chan, stream_ind, debug=0, i=0): | |
return i, ccc | ||
|
||
|
||
def _channel_loop(templates, stream, cores=1, debug=0, internal=True): | ||
def _channel_loop(templates, stream, cores=1, debug=0): | ||
""" | ||
Internal loop for parallel processing. | ||
|
||
|
@@ -3442,10 +3632,6 @@ def _channel_loop(templates, stream, cores=1, debug=0, internal=True): | |
:param cores: Number of cores to loop over | ||
:type debug: int | ||
:param debug: Debug level. | ||
:type internal: bool | ||
:param internal: | ||
Whether to use the internal Python code (True) or the experimental | ||
compilled code. | ||
|
||
:returns: | ||
New list of :class:`numpy.ndarray` objects. These will contain | ||
|
@@ -3464,8 +3650,6 @@ def _channel_loop(templates, stream, cores=1, debug=0, internal=True): | |
are duplicate channels in the template you do not need duplicate | ||
channels in the stream). | ||
""" | ||
if not internal: | ||
print('Not yet coded') | ||
num_cores = cores | ||
if len(templates) < num_cores: | ||
num_cores = len(templates) | ||
|
@@ -3809,7 +3993,7 @@ def match_filter(template_names, template_list, st, threshold, | |
raise MatchFilterError(msg) | ||
outtic = time.clock() | ||
if debug >= 2: | ||
print('Ensuring all template channels have matches in long data') | ||
print('Ensuring all template channels have matches in continuous data') | ||
template_stachan = {} | ||
# Work out what station-channel pairs are in the templates, including | ||
# duplicate station-channel pairs. We will use this information to fill | ||
|
@@ -3908,9 +4092,8 @@ def match_filter(template_names, template_list, st, threshold, | |
for template in templates: | ||
print(template) | ||
print(stream) | ||
[cccsums, no_chans, chans] = _channel_loop( | ||
templates=templates, stream=stream, cores=cores, debug=debug, | ||
internal=internal) | ||
[cccsums, no_chans, chans] = multichannel_xcorr( | ||
templates=templates, stream=stream, cores=cores) | ||
if len(cccsums[0]) == 0: | ||
raise MatchFilterError('Correlation has not run, zero length cccsum') | ||
outtoc = time.clock() | ||
|
@@ -4029,31 +4212,6 @@ def match_filter(template_names, template_list, st, threshold, | |
return detections, det_cat, detection_streams | ||
|
||
|
||
def _spike_test(stream, percent=0.99, multiplier=1e6): | ||
""" | ||
Check for very large spikes in data and raise an error if found. | ||
|
||
:param stream: Stream to look for spikes in. | ||
:type stream: :class:`obspy.core.stream.Stream` | ||
:param percent: Percentage as a decimal to calcualte range for. | ||
:type percent: float | ||
:param multiple: Multiplier of range to define a spike. | ||
:type multiple: float | ||
""" | ||
for tr in stream: | ||
if (tr.data > 2 * np.max( | ||
np.sort(np.abs( | ||
tr))[0:int(percent * len(tr.data))]) * multiplier).sum() > 0: | ||
msg = ('Spikes above ' + str(multiplier) + | ||
' of the range of ' + str(percent) + | ||
' of the data present, check. \n ' + | ||
'This would otherwise likely result in an issue during ' + | ||
'FFT prior to cross-correlation.\n' + | ||
'If you think this spike is real please report ' + | ||
'this as a bug.') | ||
raise MatchFilterError(msg) | ||
|
||
|
||
if __name__ == "__main__": | ||
import doctest | ||
doctest.testmod() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,21 +72,20 @@ | |
if not READ_THE_DOCS: | ||
install_requires = ['numpy>=1.8.0', 'obspy>=1.0.0', | ||
'matplotlib>=1.3.0', 'joblib>=0.8.4', | ||
'scipy>=0.14', 'multiprocessing', | ||
'LatLon', 'h5py', 'cython', 'bottleneck'] | ||
'scipy>=0.18', 'LatLon', 'cython', | ||
'bottleneck', 'xarray'] | ||
else: | ||
install_requires = ['numpy>=1.8.0', 'obspy>=1.0.0', | ||
'matplotlib>=1.3.0', 'joblib>=0.8.4', | ||
'multiprocessing', | ||
'LatLon'] | ||
else: | ||
if not READ_THE_DOCS: | ||
install_requires = ['numpy>=1.8.0', 'obspy>=0.10.2', | ||
install_requires = ['numpy>=1.8.0', 'obspy>=1.0.0', | ||
'matplotlib>=1.3.0', 'joblib>=0.8.4', | ||
'scipy>=0.14', 'LatLon', 'h5py', 'cython', | ||
'bottleneck'] | ||
'scipy>=0.18', 'LatLon', 'cython', | ||
'bottleneck', 'xarray'] | ||
else: | ||
install_requires = ['numpy>=1.8.0', 'obspy>=0.10.2', | ||
install_requires = ['numpy>=1.8.0', 'obspy>=1.0.0', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. as mentioned above we need at least np 1.12.0 because we are using flip There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks! |
||
'matplotlib>=1.3.0', 'joblib>=0.8.4', | ||
'LatLon'] | ||
# install_requires.append('ConfigParser') | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this for avoiding bottleneck: 164 ? It might be worth adding a bit in the doc string about why this function might be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not, this was for an issue from openCV correlations - I'm not sure if it is needed for the scipy internals - I need to check that and if it's not needed I should remove it - it's really slow and at least one user has complained!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I have sped this up a lot, and it looks like it is needed to stabilise ffts.