Source code for seispy.eq

import numpy as np
import obspy
from obspy.io.sac import SACTrace
from obspy.signal.rotate import rotate2zne, rotate_zne_lqt
from scipy.signal import resample
from os.path import join
from seispy.decon import RFTrace
from seispy.geo import snr, srad2skm, rotateSeisENtoTR, \
                       rssq, extrema
from seispy.utils import scalar_instance
from obspy.signal.trigger import recursive_sta_lta
import glob


[docs] class NotEnoughComponent(Exception): def __init__(self, matchkey): self.matchkey = matchkey def __str__(self): return '{}'.format(self.matchkey)
[docs] class TooMoreComponents(Exception): def __init__(self, matchkey): self.matchkey = matchkey def __str__(self): return '{}'.format(self.matchkey)
[docs] def rotateZNE(st): """Rotate Z, N, E components to Z, N, E components """ try: zne = rotate2zne( st[0], st[0].stats.sac.cmpaz, st[0].stats.sac.cmpinc, st[1], st[1].stats.sac.cmpaz, st[1].stats.sac.cmpinc, st[2], st[2].stats.sac.cmpaz, st[2].stats.sac.cmpinc) except Exception as e: raise ValueError('No specified cmpaz or cmpinc. {}'.format(e)) for tr, new_data, component in zip(st, zne, "ZNE"): tr.data = new_data tr.stats.channel = tr.stats.channel[:-1] + component
[docs] class EQ(object): def __init__(self, pathname, datestr, suffix='SAC'): """Class for processing event data with 3 components, which read SAC files of ``pathname*datastr*suffix`` :param pathname: Directory to SAC files :type pathname: string :param datestr: date part in filename, e.g., ``2021.122.12.23.40`` :type datestr: string :param suffix: suffix for SAC files, defaults to 'SAC' :type suffix: str, optional """ self.datestr = datestr self.filestr = join(pathname, '*' + datestr + '*' + suffix) if glob.glob(self.filestr): self.st = obspy.read(self.filestr) self._check_comp() self.st.sort() self.set_comp() else: self.st = obspy.Stream() self.rf = obspy.Stream() self.timeoffset = 0 self.rms = np.array([0]) self.it = 0 self.trigger_shift = 0 self.inc_correction = 0 def __str__(self): return('Event data class {0}'.format(self.datestr)) def _check_comp(self): if len(self.st) < 3: channel = ' '.join([tr.stats.channel for tr in self.st]) raise NotEnoughComponent('Sismogram must be in 3 components, but there are only channel {} of {}'.format(channel, self.datestr)) elif len(self.st) > 3: raise TooMoreComponents('{} has more than 3 components, please select to delete redundant seismic components'.format(self.datestr)) else: pass
[docs] def readstream(self): """Read SAC files to stream """ self.rf = obspy.Stream() self.st = obspy.read(self.filestr) self._check_comp() self.st.sort() self.set_comp()
def _cleanstream(self): self.st = None self.rf = None
[docs] @classmethod def from_stream(cls, stream): """Create EQ object from obspy stream :param stream: obspy stream :type stream: obspy.Stream :return: EQ object :rtype: EQ """ eq = cls('', '') eq.st = stream eq.datestr = eq.st[0].stats.starttime.strftime('%Y.%j.%H.%M.%S') eq._check_comp() eq.st.sort() eq.set_comp() return eq
[docs] def set_comp(self): """Set component name """ if self.st.select(channel='*[E2]'): self.comp = 'enz' elif self.st.select(channel='*R'): self.comp = 'rtz' elif self.st.select(channel='*Q'): self.comp = 'lqt' else: raise ValueError('No such component in R, E or Q')
[docs] def channel_correct(self, switchEN=False, reverseE=False, reverseN=False): """Correct channel name for R, E, N components :param switchEN: _description_, defaults to False :type switchEN: bool, optional :param reverseE: _description_, defaults to False :type reverseE: bool, optional :param reverseN: _description_, defaults to False :type reverseN: bool, optional """ if reverseE: self.st.select(channel='*[E2]')[0].data *= -1 if reverseN: self.st.select(channel='*[N1]')[0].data *= -1 if switchEN: chE = self.st.select(channel='*[E2]')[0].stats.channel chN = self.st.select(channel='*[N1]')[0].stats.channel self.st.select(channel='*[E2]')[0].stats.channel = chN self.st.select(channel='*[N1]')[0].stats.channel = chE
[docs] def write(self, path, evt_datetime): """Write raw stream to SAC files :param path: path to save SAC files :type path: str :param evt_datetime: event datetime :type evt_datetime: obspy.core.utcdatetime.UTCDateTime """ for tr in self.st: sac = SACTrace.from_obspy_trace(tr) sac.b = 0 sac.o = evt_datetime - tr.stats.starttime fname = join(path, '{}.{}.{}.{}.SAC'.format( tr.stats.network, tr.stats.station, tr.stats.starttime.strftime('%Y.%j.%H%M%S'), tr.stats.channel )) sac.write(fname)
[docs] def detrend(self): """Detrend and demean """ self.st.detrend(type='linear') self.st.detrend(type='constant') self.fix_channel_name()
[docs] def filter(self, freqmin=0.05, freqmax=1, order=4): """Bandpass filter :param freqmin: minimum frequency, defaults to 0.05 :type freqmin: float, optional :param freqmax: maximum frequency, defaults to 1 :type freqmax: float, optional :param order: filter order, defaults to 4 :type order: int, optional """ self.st.filter('bandpass', freqmin=freqmin, freqmax=freqmax, corners=order, zerophase=True)
[docs] def get_arrival(self, model, evdp, dis, phase='P'): """Get arrival time, ray parameter and incident angle from TauP model :param model: TauP model :type model: TauPyModel :param evdp: focal depth :type evdp: float :param dis: epicentral distance :type dis: float :param phase: phase name, defaults to 'P' :type phase: str, optional """ self.phase = phase arrivals = model.get_travel_times(evdp, dis, phase_list=[phase]) if not arrivals: raise ValueError('The phase of {} is not exists. Please check the setting of distance and phase'.format(phase)) # if len(arrivals) > 1: # raise ValueError('More than one phase were calculated with distance of {} and focal depth of {}'.format(dis, evdp)) # else: self.arr_time = arrivals[0].time self.rayp = arrivals[0].ray_param self.inc = arrivals[0].incident_angle
[docs] def search_inc(self, bazi): """Search incident angle for S wave :param bazi: back azimuth :type bazi: float :return: incident angle :rtype: float """ inc_range = np.arange(0.1, 90, 0.1) s_range = self.trim(20, 20, isreturn=True) power = np.zeros(inc_range.shape[0]) for i in range(len(inc_range)): l_comp, _, _ = rotate_zne_lqt(s_range[2].data, s_range[1].data, s_range[0].data, bazi, inc_range[i]) power[i] = np.mean(l_comp ** 2) real_inc_idx = np.argmin(power) real_inc = inc_range[real_inc_idx] self.inc_correction = real_inc - self.inc self.inc = real_inc
[docs] def search_baz(self, bazi, time_b=10, time_e=20, offset=90): """Search back azimuth for P wave :param bazi: back azimuth :type bazi: float :param time_b: time before P arrival, defaults to 10 :type time_b: int, optional :param time_e: time after P arrival, defaults to 20 :type time_e: int, optional :param offset: offset for searching, defaults to 90 :type offset: int, optional :return: back azimuth and amplitude :rtype: (float, np.ndarray) """ p_arr = self.arr_correct(write_to_sac=False) this_st = self.st.copy() this_st.filter('bandpass', freqmin=0.03, freqmax=0.5) this_st.trim(this_st[0].stats.starttime+p_arr-time_b, this_st[0].stats.starttime+p_arr+time_e) bazs = np.arange(-offset, offset) + bazi ampt = np.zeros_like(bazs) for i, b in enumerate(bazs): t, _ = rotateSeisENtoTR(this_st[0].data, this_st[1].data, b) ampt[i] = rssq(t) ampt = ampt / np.max(ampt) idx = extrema(ampt, opt='min') if len(idx) == 0: corr_baz = np.nan elif len(idx) > 1: corr_baz = None else: corr_baz = bazs[idx[0]] - bazi return corr_baz, ampt
[docs] def fix_channel_name(self): """Fix channel name for R, E, N components """ if self.st.select(channel='??1') and self.st.select(channel='??Z') and hasattr(self.st.select(channel='*1')[0].stats.sac, 'cmpaz'): if self.st.select(channel='*1')[0].stats.sac.cmpaz == 0: self.st.select(channel='*1')[0].stats.channel = self.st.select(channel='*1')[0].stats.channel[:-1] + 'N' self.st.select(channel='*2')[0].stats.channel = self.st.select(channel='*2')[0].stats.channel[:-1] + 'E' elif self.st.select(channel='*1')[0].stats.sac.cmpaz != 0: rotateZNE(self.st) self.st.sort(['channel']) elif self.st.select(channel='*1'): self.st.select(channel='*1')[0].stats.channel = 'Z' self.st.select(channel='*2')[0].stats.channel = 'N' self.st.select(channel='*3')[0].stats.channel = 'E' self.st.sort(['channel']) else: pass
[docs] def rotate(self, baz, inc=None, method='NE->RT', search_inc=False, baz_shift=0): """Rotate to radial and transverse components :param baz: back azimuth :type baz: float :param inc: incident angle, defaults to None :type inc: float, optional :param method: method for rotation, defaults to 'NE->RT' :type method: str, optional :param search_inc: whether search incident angle, defaults to False :type search_inc: bool, optional :param baz_shift: shift back azimuth, defaults to 0 :type baz_shift: int, optional """ bazi = np.mod(baz + baz_shift, 360) if inc is None: if self.phase[-1] == 'S' and search_inc: inc = self.search_inc(bazi) else: self.inc = inc if method == 'NE->RT': self.comp = 'rtz' self.st.rotate('NE->RT', back_azimuth=bazi) elif method == 'RT->NE': self.st.rotate('RT->NE', back_azimuth=bazi) elif method == 'ZNE->LQT': self.comp = 'lqt' self.st.rotate('ZNE->LQT', back_azimuth=bazi, inclination=self.inc) elif method == 'LQT->ZNE': self.st.rotate('LQT->ZNE', back_azimuth=bazi, inclination=self.inc) else: pass
[docs] def snr(self, length=50): """Calculate SNR :param length: length for noise, defaults to 50 :type length: int, optional :return: SNR of E, N, Z components :rtype: (float, float, float) """ st_noise = self.trim(length, 0, isreturn=True) st_signal = self.trim(0, length, isreturn=True) try: snr_E = snr(st_signal[0].data, st_noise[0].data) except IndexError: snr_E = 0 try: snr_N = snr(st_signal[1].data, st_noise[1].data) except IndexError: snr_N = 0 try: snr_Z = snr(st_signal[2].data, st_noise[2].data) except IndexError: snr_Z = 0 return snr_E, snr_N, snr_Z
[docs] def get_time_offset(self, event_time=None): """Get time offset from SAC header :param event_time: event time, defaults to None :type event_time: obspy.core.utcdatetime.UTCDateTime, optional """ if event_time is not None and not isinstance(event_time, obspy.core.utcdatetime.UTCDateTime): raise TypeError('Event time should be UTCDateTime type in obspy') elif event_time is None: self.timeoffset = self.st[2].stats.sac.b - self.st[2].stats.sac.o else: self.timeoffset = self.st[2].stats.starttime - event_time
[docs] def arr_correct(self, write_to_sac=True): """ offset = sac.b - real o """ correct_time = self.arr_time - self.timeoffset if write_to_sac: for tr in self.st: tr.stats.sac.t0 = correct_time tr.stats.sac.kt0 = self.phase return correct_time
def _get_time(self, time_before, time_after): arr = self.arr_correct(write_to_sac=False) t1 = self.st[2].stats.starttime + (arr + self.trigger_shift - time_before) t2 = self.st[2].stats.starttime + (arr + self.trigger_shift + time_after) return t1, t2
[docs] def phase_trigger(self, time_before, time_after, prepick=True, stl=5, ltl=10): """ Trigger P or S phase :param time_before: time before P or S arrival :type time_before: float :param time_after: time after P or S arrival :type time_after: float :param prepick: whether use prepick, defaults to True :type prepick: bool, optional :param stl: short time length for STA/LTA, defaults to 5 :type stl: int, optional :param ltl: long time length for STA/LTA, defaults to 10 :type ltl: int, optional """ t1, t2 = self._get_time(time_before, time_after) self.st_pick = self.st.copy().trim(t1, t2) if len(self.st_pick) == 0: return if prepick: if self.phase[-1] == 'P': tr = self.st_pick.select(channel='*Z')[0] else: tr = self.st_pick.select(channel='*T')[0] df = tr.stats.sampling_rate cft = recursive_sta_lta(tr.data, int(stl*df), int(ltl*df)) n_trigger = np.argmax(np.diff(cft)[int(ltl*df):])+int(ltl*df) self.t_trigger = t1 + n_trigger/df self.trigger_shift = n_trigger/df - time_before else: self.t_trigger = t1 self.trigger_shift = 0.0
[docs] def trim(self, time_before, time_after, isreturn=False): """ offset = sac.b - real o """ t1, t2 = self._get_time(time_before, time_after) if isreturn: return self.st.copy().trim(t1, t2) else: self.st.trim(t1, t2)
[docs] def deconvolute(self, shift, time_after, f0=2.0, method='iter', only_r=False, itmax=400, minderr=0.001, wlevel=0.05, target_dt=None): """Deconvolution Parameters ---------- shift : float Time shift before P arrival time_after : float Time length after P arrival f0 : float or list, optional Gaussian factors, by default 2.0 method : str, optional method for deconvolution in ``iter`` or ``water``, by default ``iter`` only_r : bool, optional Whether only calculate RF in prime component, by default False itmax : int, optional Maximum iterative number, valid for method of ``iter``, by default 400 minderr : float, optional Minium residual error, valid for method of ``iter``, by default 0.001 wlevel : float, optional Water level, valid for method of ``water``, by default 0.05 target_dt : None or float, optional Time delta for resampling, by default None """ self.method = method if scalar_instance(f0): f0 = [f0] for ff in f0: if method == 'iter': kwargs = {'method': method, 'f0': ff, 'tshift': shift, 'itmax': itmax, 'minderr': minderr} elif method == 'water': kwargs = {'method': method, 'f0': ff, 'tshift': shift, 'wlevel': wlevel} else: raise ValueError('method must be in \'iter\' or \'water\'') if self.phase[-1] == 'P': self.decon_p(**kwargs) if not only_r: self.decon_p(tcomp=True, **kwargs) else: # TODO: if 'Q' not in self.rf[1].stats.channel or 'L' not in self.rf[2].stats.channel: # raise ValueError('Please rotate component to \'LQT\'') self.decon_s(**kwargs) if target_dt is not None: for tr in self.rf: if tr.stats.delta != target_dt: tr.data = resample(tr.data, int((shift + time_after)/target_dt+1)) tr.stats.delta = target_dt
[docs] def decon_p(self, tshift, tcomp=False, **kwargs): """Deconvolution for P wave :param tshift: Time shift before P arrival :type tshift: float :param tcomp: Whether calculate transverse component, defaults to False :type tcomp: bool, optional """ if self.comp == 'lqt': win = self.st.select(channel='*L')[0] if tcomp: uin = self.st.select(channel='*T')[0] else: uin = self.st.select(channel='*Q')[0] uin.data *= -1 else: win = self.st.select(channel='*Z')[0] if tcomp: uin = self.st.select(channel='*T')[0] else: uin = self.st.select(channel='*R')[0] uout = RFTrace.deconvolute(uin, win, phase='P', tshift=tshift, **kwargs) self.rf.append(uout)
[docs] def decon_s(self, tshift, **kwargs): """Deconvolution for S wave :param tshift: Time shift before P arrival :type tshift: float """ if self.comp == 'lqt': win = self.st.select(channel='*Q')[0] uin = self.st.select(channel='*L')[0] else: win = self.st.select(channel='*R')[0] uin = self.st.select(channel='*Z')[0] win.data *= -1 # win.data[0:int((tshift-4)/win.stats.delta)] = 0 uout = RFTrace.deconvolute(uin, win, phase='S', tshift=tshift, **kwargs) uout.data = np.flip(uout.data) self.rf.append(uout)
[docs] def saverf(self, path, evtstr=None, shift=0, evla=-12345., evlo=-12345., evdp=-12345., mag=-12345., gauss=0, baz=-12345., gcarc=-12345., only_r=False, **kwargs): """Save receiver function to SAC file :param path: path to save SAC file :type path: str :param evtstr: event string, defaults to None :type evtstr: str, optional :param shift: time shift before P arrival, defaults to 0 :type shift: int, optional :param evla: event latitude, defaults to -12345. :type evla: float, optional :param evlo: event longitude, defaults to -12345. :type evlo: float, optional :param evdp: event depth, defaults to -12345. :type evdp: float, optional :param mag: event magnitude, defaults to -12345. :type mag: float, optional :param gauss: Gaussian factor, defaults to 0 :type gauss: float, optional :param baz: back azimuth, defaults to -12345. :type baz: float, optional :param gcarc: epicentral distance, defaults to -12345. :type gcarc: float, optional :param only_r: whether only save R component, defaults to False :type only_r: bool, optional """ if self.phase[-1] == 'P': if self.comp == 'lqt': svcomp = 'Q' else: svcomp = 'R' if only_r: loop_lst = [svcomp] else: loop_lst = [svcomp, 'T'] rayp = srad2skm(self.rayp) elif self.phase[-1] == 'S': if self.comp == 'lqt': loop_lst = ['L'] else: loop_lst = ['Z'] rayp = srad2skm(self.rayp) else: pass if evtstr is None: filename = join(path, self.datestr) else: filename = join(path, evtstr) for comp in loop_lst: trrfs = self.rf.select(channel='*'+comp) try: trrf = [tr for tr in trrfs if tr.stats.f0 == gauss][0] except: ValueError('No such gauss factor of {} in calculated RFs'.format(gauss)) header = {'evla': evla, 'evlo': evlo, 'evdp': evdp, 'mag': mag, 'baz': baz, 'gcarc': gcarc, 'user0': rayp, 'kuser0': 'Ray Para', 'user1': gauss, 'kuser1': 'G factor'} for key in kwargs: header[key] = kwargs[key] for key, value in header.items(): trrf.stats['sac'][key] = value tr = SACTrace.from_obspy_trace(trrf) tr.b = -shift tr.a = 0 tr.ka = self.phase tr.write(filename + '_{0}_{1}.sac'.format(self.phase, tr.kcmpnm[-1]))
def _s_condition(self, trrf, shift): nt0 = int(np.floor((shift)/trrf.stats.delta)) nt25 = int(np.floor((shift+25)/trrf.stats.delta)) if rssq(trrf.data[nt0:nt25]) > rssq(trrf.data[nt25:]): return True else: return False
[docs] def judge_rf(self, gauss, shift, npts, criterion='crust', rmsgate=None): """Judge whether receiver function is valid :param gauss: Gaussian factor :type gauss: float :param shift: time shift before P arrival :type shift: float :param npts: number of points for RF :type npts: int :param criterion: criterion for judging, defaults to 'crust' :type criterion: str, optional :param rmsgate: RMS gate, defaults to None :type rmsgate: float, optional :return: whether RF is valid :rtype: bool """ if self.phase[-1] == 'P' and self.comp == 'rtz': trrfs = self.rf.select(channel='*R') elif self.phase[-1] == 'P' and self.comp == 'lqt': trrfs = self.rf.select(channel='*Q') elif self.phase[-1] == 'S' and self.comp == 'lqt': trrfs = self.rf.select(channel='*L') elif self.phase[-1] == 'S' and self.comp == 'rtz': trrfs = self.rf.select(channel='*Z') for tr in trrfs: if tr.stats.npts != npts: return False try: trrf = [tr for tr in trrfs if tr.stats.f0 == gauss][0] except: ValueError('No such gauss factor of {} in calculated RFs'.format(gauss)) # All points are NaN if np.isnan(trrf.data).all(): return False if np.isinf(trrf.data).any(): return False # Final RMS if rmsgate is not None: if self.method == 'iter': rms = trrf.stats.rms[-1] else: rms = trrf.stats.rms rmspass = rms < rmsgate else: rmspass = True # R energy nt1 = int(np.floor((5+shift)/trrf.stats.delta)) reng = np.sum(np.sqrt(trrf.data[nt1:] ** 2)) if reng < 0.1: rengpass = False else: rengpass = True # Max amplitude if criterion == 'crust': time_P1 = int(np.floor((-2+shift)/trrf.stats.delta)) time_P2 = int(np.floor((4+shift)/trrf.stats.delta)) max_P = np.max(trrf.data[time_P1:time_P2]) if max_P == np.max(np.abs(trrf.data)) and max_P < 1 and rmspass and rengpass: return True and rengpass else: return False elif criterion == 'mtz': max_deep = np.max(np.abs(trrf.data[int((30 + shift) / trrf.stats.delta):])) time_P1 = int(np.floor((-5 + shift) / trrf.stats.delta)) time_P2 = int(np.floor((5 + shift) / trrf.stats.delta)) max_P = np.max(trrf.data[time_P1:time_P2]) if max_deep < max_P * 0.4 and rmspass and rengpass and\ max_P == np.max(np.abs(trrf.data)) and max_P < 1: return True and rengpass else: return False elif criterion == "lab": return self._s_condition(trrf, shift) and rengpass elif criterion is None: return rmspass and rengpass else: pass
if __name__ == '__main__': pass