import math
from obspy import UTCDateTime
from numpy import nan, isnan, abs
import numpy as np
import copy
from obspy.core import Stream, read
def floor_decimal(n, decimals=0):
multiplier = 10 ** decimals
return math.floor(n * multiplier) / multiplier
[docs]
def traceshift(trace, tt):
"""
Function to shift traces in time given travel time
Parameters
----------
trace : :class:`~obspy.core.Trace`
Seismogram
tt : float
Time delay (sec)
Returns
-------
rtrace : :class:`~obspy.core.Trace`
Shifted copy of the input trace
"""
# Define frequencies
nt = trace.stats.npts
dt = trace.stats.delta
freq = np.fft.fftfreq(nt, d=dt)
# Fourier transform
ftrace = np.fft.fft(trace.data)
# Shift
for i in range(len(freq)):
ftrace[i] = ftrace[i]*np.exp(-2.*np.pi*1j*freq[i]*tt)
# Back Fourier transform and return as trace
rtrace = trace.copy()
rtrace.data = np.real(np.fft.ifft(ftrace))
# Update start time
rtrace.stats.starttime -= tt
return rtrace
[docs]
def download_data(client=None, sta=None, start=UTCDateTime(),
end=UTCDateTime(), new_sr=0., verbose=False,
remove_response=False, zcomp='Z'):
"""
Function to build a stream object for a seismogram in a given time window
by getting data from a client object, either from a local SDS archive or
from an FDSN web-service. The function performs sanity checks for
the start times, sampling rates and window lengths.
Parameters
----------
client : :class:`~obspy.client.fdsn.Client`
Client object
sta : Dict
Station metadata from :mod:`~StDb` data base
start : :class:`~obspy.core.utcdatetime.UTCDateTime`
Start time for request
end : :class:`~obspy.core.utcdatetime.UTCDateTime`
End time for request
new_sr : float
New sampling rate (Hz)
verbose : bool
Whether or not to print messages to screen during run-time
remove_response : bool
Remove instrument response from seismogram and resitute to true ground
velocity (m/s) using obspy.core.trace.Trace.remove_response()
zcomp: str
Vertical Component Identifier. Should be a single character.
This is different then 'Z' only for fully unknown component
orientation (i.e., components are 1, 2, 3)
Returns
-------
err : bool
Boolean for error handling (`False` is associated with success)
trN : :class:`~obspy.core.Trace`
Trace of North component of motion
trE : :class:`~obspy.core.Trace`
Trace of East component of motion
trZ : :class:`~obspy.core.Trace`
Trace of Vertical component of motion
"""
for loc in sta.location:
# Construct location name
if loc == "--":
tloc = ""
else:
tloc = copy.copy(loc)
# Construct Channel List
cha = sta.channel.upper() + '?'
msg = "* {0:s}.{1:2s}?.{2:2s} - Checking Network".format(
sta.station, sta.channel.upper(), loc)
print(msg)
# Get waveforms, with extra 1 second to avoid
# traces cropped too short - traces are trimmed later
try:
st = client.get_waveforms(
network=sta.network,
station=sta.station,
location=tloc,
channel=cha,
starttime=start,
endtime=end+1.)
except Exception as e:
if verbose:
print("* Met exception:")
print("* " + e.__repr__())
st = None
else:
if len(st) == 3:
# It's possible if len(st)==1 that data is Z12
print("* - Data Downloaded")
# break
# Check the correct 3 components exist
if st is None or len(st) < 3:
print("* Error retrieving waveforms")
print("**************************************************")
return True, None
# Three components successfully retrieved
if remove_response:
try:
st.remove_response()
if verbose:
print("*")
print("* Restituted stream to true ground velocity.")
print("*")
except Exception as e:
print("*")
print('* Cannot remove response, moving on.')
print("*")
# Detrend and apply taper
st.detrend('demean').detrend('linear').taper(
max_percentage=0.05, max_length=5.)
# Check start times
if not np.all([tr.stats.starttime == start for tr in st]):
if verbose:
print("* Start times are not all close to true start: ")
[print("* "+tr.stats.channel+" " +
str(tr.stats.starttime)+" " +
str(tr.stats.endtime)) for tr in st]
print("* True start: "+str(start))
print("* -> Shifting traces to true start")
delay = [tr.stats.starttime - start for tr in st]
st_shifted = Stream(
traces=[traceshift(tr, dt) for tr, dt in zip(st, delay)])
st = st_shifted.copy()
# Check sampling rate
sr = st[0].stats.sampling_rate
sr_round = float(floor_decimal(sr, 0))
if not sr == sr_round:
if verbose:
print("* Sampling rate is not an integer value: ", sr)
print("* -> Resampling")
st.resample(sr_round, no_filter=False)
# Try trimming
try:
st.trim(start, end)
except Exception as e:
print("* Unable to trim")
print("* -> Skipping")
print("**************************************************")
return True, None
# Check final lengths - they should all be equal if start times
# and sampling rates are all equal and traces have been trimmed
if not np.allclose([tr.stats.npts for tr in st[1:]], st[0].stats.npts):
print("* Lengths are incompatible: ")
[print("* "+str(tr.stats.npts)) for tr in st]
print("* -> Skipping")
print("**************************************************")
return True, None
elif not np.allclose([st[0].stats.npts], int((end - start)*sr),
atol=1):
print("* Length is too short: ")
print("* "+str(st[0].stats.npts) +
" ~= "+str(int((end - start)*sr)))
print("* -> Skipping")
print("**************************************************")
return True, None
else:
print("* Waveforms Retrieved...")
return False, st