# Copyright 2022 Wasja Bloch, Pascal Audet
# This file is part of PyRaysum.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import numpy as np
from scipy import signal
from obspy import Trace, Stream, UTCDateTime
from numpy.fft import fft, ifft, fftshift
# Here to avoid circular import
_phnames = {1: "P", 2: "S", 3: "T", 4: "p", 5: "s", 6: "t"}
[docs]def read_traces(traces, rc, geometry, arrivals=None):
"""
Create a :class:`Result` from the array produced by :meth:`fraysum.run_bare()`
and :meth:`fraysum.run_full`.
Parameters:
rc (:class:`Control`):
Run-control parameters
geometry (:class:`Geometry`):
Ray parameters
arrivals (list):
Output of :meth:`read_arrivals`. List of arrival times, amplitudes, and
names
Returns:
list:
List of :class:`obspy.Stream` objects
"""
npts = rc.npts
dt = rc.dt
rot = rc.rot
shift = rc.shift
ntr = geometry.ntr
# Crop unused overhang of oversized fortran arrays
trs = [
traces[0, :npts, :ntr].reshape(npts * ntr, order="F"),
traces[1, :npts, :ntr].reshape(npts * ntr, order="F"),
traces[2, :npts, :ntr].reshape(npts * ntr, order="F"),
]
itr = np.array([npts * [tr] for tr in range(ntr)]).reshape(npts * ntr)
# Component names
if rot == 0:
# Rotate to seismometer convention
component = ["N", "E", "Z"]
order = [2, 0, 1]
elif rot == 1:
component = ["R", "T", "Z"]
order = [0, 1, 2]
elif rot == 2:
component = ["P", "V", "H"]
order = [0, 1, 2]
else:
raise (ValueError('Invalid value for "rot": Must be 0, 1, 2'))
taxis = np.arange(npts) * dt - shift
streams = []
for iitr in range(ntr):
# Split by trace ID
istr = itr == iitr
# Store into trace by component with stats information
stream = Stream()
for ic in order:
stats = {
"baz": geometry[iitr][0],
"slow": geometry[iitr][1],
"station": "",
"network": "",
"starttime": UTCDateTime(0),
"delta": dt,
"channel": "SY" + component[ic],
"taxis": taxis,
}
if arrivals:
stats.update(
{
"phase_times": arrivals[iitr][ic][0],
"phase_amplitudes": arrivals[iitr][ic][1],
"phase_descriptors": arrivals[iitr][ic][2],
"phase_names": arrivals[iitr][ic][3],
"conversion_names": arrivals[iitr][ic][4],
}
)
if rot == 0 and component[ic] != "Z":
# Raysum has z down, change here to z up
tr = Trace(data=-trs[ic][istr], header=stats)
tr.stats.phase_amplitudes *= -1
else:
tr = Trace(data=trs[ic][istr], header=stats)
stream.append(tr)
# Store into Stream object and append to list
streams.append(stream)
return streams
[docs]def read_arrivals(ttimes, amplitudes, phaselist, geometry):
"""
Convert the :const:`phaselist`, :const:`amplitude` and :const:`traveltime` output of
:meth:`fraysum.run_full()` to lists of phase arrival times, amplitudes, long phase
descriptors, short phase names, and conversion names.
Parameters:
ttimes (np.array):
Travel time array ...
amplitudes (np.array):
Amplitude array ...
phaselist (np.array):
Phase identifier array returned by :func:`fraysum.run_full`
geometry (:class:`prs.Geometry`):
Ray geometry
Returns:
list:
3-component phase arrival lists, where indices translate to:
* :const:`0`: phase arrival times
* :const:`1`: phase amplitudes
* :const:`2`: (long) phase descriptors, e.g. "1P0S"
* :const:`3`: (short) phase names, e.g. "PS"
* :const:`4`: (intermediate) conversion names, e.g. "P1S"
"""
dscrs = []
phnms = []
convs = []
for iph in range(len(phaselist[0, 0, :])):
dscr = ""
phnm = ""
conv = ""
for iseg in range(len(phaselist[:, 0, 0])):
if phaselist[iseg, 0, iph] == 0:
# No more reflections / conversions
dscrs.append(dscr)
phnms.append(phnm)
convs.append(conv)
break
phid = phaselist[iseg, 1, iph]
layn = phaselist[iseg, 0, iph] - 1 # Python indexing
phn = _phnames[phid]
dscr += str(layn) + phn
if phn.isupper():
# Upward-conversion at top of layer below
con = str(layn + 1) + phn
else:
con = str(layn) + phn
# Omit not-converted segment from phase name and conversions
try:
if phnm[-1] == phn:
phn = ""
con = ""
except IndexError:
con = ""
# Always prefix incoming wavetype to conversions
if iseg == 0:
con = phn
phnm += phn
conv += con
if phaselist[0, 0, iph + 1] == 0:
break
nphs = len(dscrs)
dscrs = np.array(dscrs)
phnms = np.array(phnms)
convs = np.array(convs)
tanss = []
for itr in range(geometry.ntr):
tans = []
for comp in range(len(amplitudes[:, 0, 0])):
amps = amplitudes[comp, :nphs, itr]
ia = abs(amps) > 1e-6 # Dismiss 0 amplitude arrivals
tans.append(
np.array(
[ttimes[:nphs, itr][ia], amps[ia], dscrs[ia], phnms[ia], convs[ia]],
dtype=object,
)
)
tanss.append(tans)
return tanss
[docs]def make_array(geometry, rc):
"""
Initialize array for `NumPy`-based post processing
Parameters:
geometry (:class:`prs.Geometry`):
Recording geometry
rc (:class:`prs.Control`):
Run-control parameters
Returns:
:const:`numpy.zeros((geometry.ntr, 2, rc.npts))`:
Array in shape to be used by :func:`filtered_rf_array` and
:func:`filtered_array`.
"""
return np.zeros((geometry.ntr, 2, rc.npts))
cached_coefficients = {}
def _get_cached_bandpass_coefs(order, corners):
# from pyrocko.Trace.filter
ck = (order, tuple(corners))
if ck not in cached_coefficients:
cached_coefficients[ck] = signal.butter(order, corners, btype="band")
return cached_coefficients[ck]
[docs]def filtered_rf_array(traces, rfarray, ntr, npts, dt, fmin, fmax):
"""
Fast, `NumPy`-based, receiver function computation and filtering of
:meth:`fraysum.run_bare()` output
Roughly equivalent to subsequent calls to :func:`read_traces()`,
:meth:`Result.calculate_rfs()` and :meth:`Result.filter()`, stripped down
for computational efficiency, for use in inversion/probabilistic approaches.
Parameters:
traces (np.ndarray):
Output of :meth:`fraysum.run_bare()`
rfarray (np.ndarray):
Initialized array of shape (ntr, 2, npts) to store output (See:
:func:`make_array()`)
ntr (int):
Number of traces (:attr:`Geometry.ntr`)
npts (int):
Number of points per trace (:attr:`Control.npts`)
dt (float):
Sampling interval (:attr:`Control.dt`)
fmin (float):
Lower bandpass frequency corner (Hz)
fmax (float):
Upper bandpass frequency corner (Hz)
Returns:
None:
None. Output is written to :const:`rfarray`
Warning:
Assumes PVH alignment (ray-polarization), i.e. :attr:`Control.rot="PVH"`.
"""
order = 2
def _bandpass(arr):
# from pyrocko.Trace.filter
(b, a) = _get_cached_bandpass_coefs(order, (2 * dt * fmin, 2 * dt * fmax))
arr -= np.mean(arr)
firstpass = signal.lfilter(b, a, arr)
return signal.lfilter(b, a, firstpass[::-1])[::-1]
# Crop unused overhang of oversized fortran arrays and transpose to
# [traces[components[samples]]] order
data = np.array(
[traces[0, :npts, :ntr], traces[1, :npts, :ntr], traces[2, :npts, :ntr]]
).transpose(2, 0, 1)
for n, trace in enumerate(data):
ft_ztr = fft(trace[0]) # P or R or N
ft_rfr = fft(trace[1]) # V or T or E
ft_rft = fft(trace[2]) # H or Z or Z
# assuming PVH:
rfarray[n, 0, :] = _bandpass(fftshift(np.real(ifft(np.divide(ft_rfr, ft_ztr)))))
rfarray[n, 1, :] = _bandpass(fftshift(np.real(ifft(np.divide(ft_rft, ft_ztr)))))
[docs]def filtered_array(traces, rfarray, ntr, npts, dt, fmin, fmax):
"""
Fast, `NumPy`-based filtering of :meth:`fraysum.run_bare()` output
Roughly equivalent to subsequent calls to :func:`read_traces()`, and
:meth:`Result.filter()`, stripped down for computational efficiency,
for use in inversion/probabilistic approaches.
Parameters:
traces (np.ndarray):
Output of :meth:`fraysum.run_bare()`
rfarray (np.ndarray):
Initialized array of shape (ntr, 2, npts) to store output (See:
:func:`make_array()`)
ntr (int):
Number of traces (:attr:`Geometry.ntr`)
npts (int):
Number of points per trace (:attr:`Control.npts`)
dt (float):
Sampling intervall (:attr:`Control.dt`)
fmin (float):
Lower bandpass frequency corner (Hz)
fmax (float):
Upper bandpass frequency corner (Hz)
Returns:
None:
None. Output is written to :const:`rfarray`
Warning:
Assumes PVH alignment (ray-polarization), i.e. :attr:`Control.rot="PVH"`.
"""
npts2 = npts // 2
rem = npts % 2
order = 2
def _bandpass(arr):
# from pyrocko.Trace.filter
(b, a) = _get_cached_bandpass_coefs(order, (2 * dt * fmin, 2 * dt * fmax))
arr -= np.mean(arr)
firstpass = signal.lfilter(b, a, arr)
return signal.lfilter(b, a, firstpass[::-1])[::-1]
# Crop unused overhang of oversized fortran arrays and transpose to
# [traces[components[samples]]] order
data = np.array(
[traces[0, :npts, :ntr], traces[1, :npts, :ntr], traces[2, :npts, :ntr]]
).transpose(2, 0, 1)
for n, trace in enumerate(data):
# assuming PVH:
rfarray[n, 0, npts2:] = _bandpass(trace[1][: npts2 + rem]) # SV
rfarray[n, 1, npts2:] = _bandpass(trace[2][: npts2 + rem]) # SH