Source code for seispy.decon

# -*- coding: utf-8 -*-
import numpy as np
import obspy
from obspy.signal.util import next_pow_2
from numpy.fft import fft, ifft
# from scipy.linalg import solve_toeplitz


[docs] def gaussFilter(dt, nft, f0): """ Gaussian filter in frequency domain. :param dt: sample interval in second :type dt: float :param nft: number of samples :type nft: int :param f0: Gauss factor :type f0: float :return: Gaussian filter in frequency domain :rtype: np.ndarray """ df = 1.0 / (nft * dt) nft21 = 0.5 * nft + 1 f = df * np.arange(0, nft21) w = 2 * np.pi * f gauss = np.zeros([nft, 1]) gauss1 = np.exp(-0.25 * (w / f0) ** 2) / dt gauss1.shape = (len(gauss1), 1) gauss[0:int(nft21)] = gauss1 gauss[int(nft21):] = np.flipud(gauss[1:int(nft21) - 1]) gauss = gauss[:, 0] return gauss
[docs] def gfilter(x, nfft, gauss, dt): """ Apply Gaussian filter on time series. :param x: input trace :type x: np.ndarray :param nfft: number of samples :type nfft: int :param gauss: Gaussian filter in frequency domain :type gauss: np.ndarray :param dt: sample interval in second :type dt: float :return: Filtered data in time domain :rtype: np.ndarray """ Xf = fft(x, nfft) Xf = Xf * gauss * dt xnew = ifft(Xf, nfft).real return xnew
[docs] def correl(R, W, nfft): """ Correlation in frequency domain. :param R: numerator :type R: np.ndarray :param W: denominator :type W: np.ndarray :return: Correlation in frequency domain :rtype: np.ndarray """ x = ifft(fft(R, nfft) * np.conj(fft(W, nfft)), nfft) x = x.real return x
[docs] def phaseshift(x, nfft, dt, tshift): """ Phase shift in frequency domain. :param x: input trace :type x: np.ndarray :param nfft: number of samples :type nfft: int :param dt: sample interval in second :type dt: float :param tshift: Time shift before P arrival :type tshift: float :return: Phase shifted data in time domain :rtype: np.ndarray """ Xf = fft(x, nfft) shift_i = int(tshift / dt) p = 2 * np.pi * np.arange(1, nfft + 1) * shift_i / nfft Xf = Xf * np.vectorize(complex)(np.cos(p), -np.sin(p)) x = ifft(Xf, nfft) / np.cos(2 * np.pi * shift_i / nfft) x = x.real return x
[docs] def deconit(uin, win, dt, nt=None, tshift=10, f0=2.0, itmax=400, minderr=0.001, phase='P'): """ Iterative deconvolution using Ligorria & Ammon method. @author: Mijian Xu @ NJU Created on Wed Sep 10 14:21:38 2014 :param uin: R or Q component for the response function :type uin: np.ndarray :param win: Z or L component for the source function :type win: np.ndarray :param dt: sample interval in second :type dt: float :param nt: number of samples, defaults to None :type nt: int, optional :param tshift: Time shift before P arrival, defaults to 10. :type tshift: float, optional :param f0: Gauss factor, defaults to 2.0 :type f0: float, optional :param itmax: Max iterations, defaults to 400 :type itmax: int, optional :param minderr: Min change in error required for stopping iterations, defaults to 0.001 :type minderr: float, optional :param phase: Phase of the RF, defaults to 'P' :type phase: str, optional :return: (RFI, rms, it) RF, rms and number of iterations. :rtype: (np.ndarray, np.ndarray, int) """ # print('Iterative Decon (Ligorria & Ammon):\n') if len(uin) != len(win): raise ValueError('The two input trace must be in same length') elif nt is None: nt = len(uin) else: pass rms = np.zeros(itmax) nfft = next_pow_2(nt) p0 = np.zeros(nfft) u0 = np.zeros(nfft) w0 = np.zeros(nfft) u0[0:nt] = uin w0[0:nt] = win gaussF = gaussFilter(dt, nfft, f0) # gaussF = _gauss_filter(dt, nfft, f0) u_flt = gfilter(u0, nfft, gaussF, dt) w_flt = gfilter(w0, nfft, gaussF, dt) wf = fft(w0, nfft) r_flt = u_flt powerU = np.sum(u_flt ** 2) it = 0 sumsq_i = 1 d_error = 100 * powerU + minderr maxlag = 0.5 * nfft # print('\tMax Spike Display is ' + str((maxlag) * dt)) while np.abs(d_error) > minderr and it < itmax: rw = correl(r_flt, w_flt, nfft) rw = rw / np.sum(w_flt ** 2) if phase == 'P': i1 = np.argmax(np.abs(rw[0:int(maxlag) - 1])) else: i1 = np.argmax(np.abs(rw)) amp = rw[i1] / dt p0[i1] = p0[i1] + amp p_flt = gfilter(p0, nfft, gaussF, dt) p_flt = gfilter(p_flt, nfft, wf, dt) r_flt = u_flt - p_flt sumsq = np.sum(r_flt ** 2) / powerU rms[it] = sumsq d_error = 100 * (sumsq_i - sumsq) sumsq_i = sumsq it = it + 1 p_flt = gfilter(p0, nfft, gaussF, dt) p_flt = phaseshift(p_flt, nfft, dt, tshift) RFI = p_flt[0:nt] rms = rms[0:it - 1] return RFI, rms, it
[docs] def deconwater(uin, win, dt, tshift=10., wlevel=0.05, f0=2.0, normalize=False, phase='P'): """ Frequency-domain deconvolution using waterlevel method. :param uin: R or Q component for the response function :type uin: np.ndarray :param win: Z or L component for the source function :type win: np.ndarray :param dt: sample interval in second :type dt: float :param tshift: Time shift before P arrival, defaults to 10. :type tshift: float, optional :param wlevel: Waterlevel to stabilize the deconvolution, defaults to 0.05 :type wlevel: float, optional :param f0: Gauss factor, defaults to 2.0 :type f0: float, optional :param normalize: If normalize the amplitude of the RF, defaults to False :type normalize: bool, optional :return: (rf, rms) RF and final rms. :rtype: (np.ndarray, float) """ if uin.size != win.size: raise ValueError('The length of the \'uin\' must be same as the \'win\'') nt = uin.size nft = next_pow_2(nt) nfpts = nft / 2 + 1 # number of freq samples fny = 1. / (2.* dt); # nyquist delf = fny / (0.5 * nft) freq = delf * np.arange(nfpts) w = 2 * np.pi * freq # containers # rff = np.zeros(nft); # Rfn in freq domain upf = np.zeros(nft); # predicted numer in freq domain # Convert seismograms to freq domain uf = fft(uin, nft) wf = fft(win, nft) # denominator df = wf * wf.conjugate() dmax = max(df.real); # add water level correction phi1 = wlevel * dmax # water level # nwl = length( find(df<phi1) ) # number corrected df[np.where(df.real < phi1)[0]] = phi1 gaussF = gaussFilter(dt, nft, f0) nf = gaussF * uf * wf.conjugate() # compute RF rff = nf / df # compute predicted numerator upf = rff * wf # add phase shift to RF w = np.append(w, -np.flipud(w[1:-1])) rff = rff * np.exp(-1j * w * tshift) # back to time domain rft = ifft(rff , nft) rft = rft[0:nt].real # compute the fit uf = gaussF * uf # compare to filtered numerator ut = ifft(uf, nft).real upt = ifft(upf, nft).real powerU = np.sum(ut[0:nt] ** 2) rms = np.sum((upt[0:nt] - ut[0:nt]) ** 2 )/powerU if normalize: gnorm = np.sum(gaussF) * delf * dt rft = rft.real / gnorm return rft, rms
[docs] def deconvolute(uin, win, dt, method='iter', **kwargs): """ Deconvolute receiver function from waveforms. :param uin: R or Q component for the response function :type uin: np.ndarray :param win: Z or L component for the source function :type win: np.ndarray :param dt: sample interval in second :type dt: float :param method: Method for deconvolution, defaults to 'iter' :type method: str, optional :param kwargs: Parameters for deconvolution :type kwargs: dict :return: RF, rms, [iter] :rtype: np.ndarray, float, int """ if method.lower() == 'iter': return deconit(uin, win, dt, **kwargs) elif method.lower() == 'water': return deconwater(uin, win, dt, **kwargs) else: raise ValueError('method must be \'iter\' or \'water\'')
[docs] class RFTrace(obspy.Trace): """ Class for receiver function trace. """ def __init__(self, data=..., header=None): super().__init__(data=data, header=header)
[docs] @classmethod def deconvolute(cls, utr, wtr, method='iter', **kwargs): """ Deconvolute receiver function from waveforms. :param utr: R or Q component for the response function :type utr: obspy.Trace :param wtr: Z or L component for the source function :type wtr: obspy.Trace :param method: Method for deconvolution, defaults to 'iter' :type method: str, optional :param kwargs: Parameters for deconvolution :type kwargs: dict :return: RFTrace object :rtype: RFTrace """ header = utr.stats.__getstate__() for key, value in kwargs.items(): header[key] = value if method.lower() == 'iter': rf, rms, it = deconit(utr.data, wtr.data, utr.stats.delta, **kwargs) header['rms'] = rms header['iter'] = it elif method.lower() == 'water': rf, rms = deconwater(utr.data, wtr.data, utr.stats.delta, **kwargs) header['rms'] = rms header['iter'] = np.nan else: raise ValueError('method must be \'iter\' or \'water\'') return cls(rf, header)