#!/usr/bin/env python # Wenchang Yang (wenchang@princeton.edu) # Wed Jun 30 17:02:45 EDT 2021 if __name__ == '__main__': from misc.timer import Timer tt = Timer(f'start {__file__}') import sys, os.path, os, glob, datetime import xarray as xr, numpy as np, pandas as pd, matplotlib.pyplot as plt #more imports from scipy.optimize import minimize from scipy.stats import rankdata, genextreme as gev from numpy import exp, log from tqdm import tqdm # if __name__ == '__main__': tt.check('end import') # #start from here # empirical cdf, ep and rp def smp_cdf(data): xx = np.sort(da) yy = rankdata(xx)/xx.size return xx, yy def smp_exceedance_prob(data): """empirical exceedance probability""" xx = np.sort(da) ep = 1- (rankdata(xx)-1)/xx.size return xx, ep def smp_return_period(data): """empirical return period""" xx, ep = smp_exceedance_prob(data) rp = 1/ep return xx, rp def plot_smp_return_period(data, ax=None, **kws): if ax is None: fig, ax = plt.subplots() ls = kws.pop('ls', 'none') marker = kws.pop('marker', 'o') fillstyle = kws.pop('fillstyle', 'none') alpha = kws.pop('alpha', 0.5) xx, rp = smp_return_period(data) ax.plot(rp, xx, ls=ls, marker=marker, fillstyle=fillstyle, alpha=alpha, **kws) ax.set_xscale('log') def plot_smp_exceedance_prob(data, ax=None, **kws): if ax is None: fig, ax = plt.subplots() ls = kws.pop('ls', 'none') marker = kws.pop('marker', 'o') fillstyle = kws.pop('fillstyle', 'none') alpha = kws.pop('alpha', 0.5) xx, ep = smp_exceedance_prob(data) ax.plot(xx, ep, ls=ls, marker=marker, fillstyle=fillsytle, alpha=alpha, **kws) ax.set_yscale('log') #theoretical cdf, ep and rp def gev_cdf(xx, mu, sigma, xi): """GEV cdf. xx: sample(s) mu: location sigma: scale xi: shape Ref: https://en.wikipedia.org/wiki/Generalized_extreme_value_distribution """ s = (xx - mu)/sigma if xi == 0: return exp( -exp(-s) ) else: #xi != 0 # xi*s > -1: exp( -(1+xi*s)**(-1/xi) ) # xi*s <=-1; xi<0: 1 # xi*s <=-1; xi>0: 0 return (xi*s>-1)*exp( -(1+xi*s)**(-1/xi) ) + (xi*s<=-1)*(xi<0) #pdf def gev_pdf(xx, mu, sigma, xi): """GEV pdf. xx: sample(s) mu: location sigma: scale xi: shape Ref: https://en.wikipedia.org/wiki/Generalized_extreme_value_distribution """ s = (xx-mu)/sigma if xi == 0: tx = exp(-s) else: tx = (1+xi*s)**(-1/xi) return (xi*s>-1)*(1/sigma) * (tx**(1+xi)) * exp(-tx) #negative log likelihood def gev_shift_negLogLikelihood(params, xx, tt): """GEV negative log likelihood. params: (mu, sigma, xi) xx: sample(s) Ref: https://en.wikipedia.org/wiki/Generalized_extreme_value_distribution """ mu0, sigma, xi, alpha = params mu = mu0 + alpha*tt s = (xx - mu)/sigma if xi == 0: return xx.size*log(sigma) + np.sum(s + exp(-s)) else: return -np.sum( log(xi*s>-1) ) + xx.size*log(sigma) + np.sum( (1+1/xi)*log(1+xi*s) ) + np.sum( (1+xi*s)**(-1/xi) ) def gev_exceedance_prob(xx, mu, sigma, xi): return 1 - gev_cdf(xx, mu, sigma, xi) def gev_return_period(xx, mu, sigma, xi): #return 1/( 1 - gev_cdf(xx, mu, sigma, xi) ) return 1/gev_exceedance_prob(xx, mu, sigma, xi) def gev_return_period_inverse(rp, mu, sigma, xi): return gev.ppf(1-1/rp, c=-xi, loc=mu, scale=sigma) def gev_shift_fit(xx, tt, **kws): method = kws.pop('method', 'Nelder-Mead') bounds_default = ( (None,None), (0,None), (None, None), (None, None)) bounds = kws.pop('bounds', bounds_default) x0_default = (xx.mean(), xx.std(), 0, 0) x0 = kws.pop('x0', x0_default) r = minimize(gev_shift_negLogLikelihood, x0, args=(xx,tt), method=method, bounds=bounds, **kws) return r def gev_shift_fit_bootstrap(xx, tt, nmc=20, **kws): np.random.seed(0) #xxmc = np.random.choice(xx, size=(nmc, xx.size)) mci = np.random.choice(xx.size, size=(nmc, xx.size)) xxmc = xx[mci] ttmc = tt[mci] params = np.zeros(shape=(nmc, 4)) + np.nan for ii in tqdm(range(nmc)): r = gev_shift_fit(xxmc[ii,:], ttmc[ii,:], **kws) if r.success: params[ii,:] = r.x else: print(f'mc = {ii};', r.message) #with mp.Pool(processes=min(40, mp.cpu_count(), nmc)) as p: # p.map(func_bs, range(nmc)) mu0 = xr.DataArray(params[:,0], dims='mc') sigma = xr.DataArray(params[:,1], dims='mc') xi = xr.DataArray(params[:,2], dims='mc') alpha = xr.DataArray(params[:,3], dims='mc') ds = xr.Dataset(dict(mu=mu0, sigma=sigma, xi=xi, alpha=alpha)) return ds def plot_cdf(mu, sigma, xi, user_defined=True, ax=None, **kws): if ax is None: fig, ax = plt.subplots() #mu, sigma, xi = 0, 1, -0.2 xx = np.linspace( gev.ppf(0.001, c=-xi, loc=mu, scale=sigma), gev.ppf(0.999, c=-xi, loc=mu, scale=sigma), 1000) if user_defined: ax.plot(xx, gev_cdf(xx, mu=mu, sigma=sigma, xi=xi), **kws) else: ax.plot(xx, gev.cdf(xx, c=-xi, loc=mu, scale=sigma), **kws) def plot_pdf(mu, sigma, xi, user_defined=True, ax=None, **kws): if ax is None: fig,ax = plt.subplots() #mu, sigma, xi = 0, 1, -0.2 xx = np.linspace( gev.ppf(0.001, c=-xi, loc=mu, scale=sigma), gev.ppf(0.999, c=-xi, loc=mu, scale=sigma), 1000) if user_defined: ax.plot(xx, gev_pdf(xx, mu=mu, sigma=sigma, xi=xi), **kws) else: ax.plot(xx, gev.pdf(xx, c=-xi, loc=mu, scale=sigma), **kws) def plot_return_period(mu, sigma, xi, lower=1/.99, upper=1000, ax=None, **kws): if ax is None: fig,ax = plt.subplots() xx = np.linspace( gev.ppf(1-1/lower, c=-xi, loc=mu, scale=sigma), gev.ppf(1-1/upper, c=-xi, loc=mu, scale=sigma), 1000) rp = gev_return_period(xx, mu, sigma, xi) ax.plot(rp, xx, **kws) ax.set_xscale('log') def plot_exceedance_prob(mu, sigma, xi, lower=0.001, upper=0.999, ax=None, **kws): if ax is None: fig,ax = plt.subplots() xx = np.linspace( gev.ppf(1-upper, c=-xi, loc=mu, scale=sigma), gev.ppf(1-lower, c=-xi, loc=mu, scale=sigma), 1000) ep = gev_exceedance_prob(xx, mu, sigma, xi) ax.plot(xx, ep, **kws) ax.set_yscale('log') if __name__ == '__main__': from wyconfig import * #my plot settings plt.close('all') ifile = '../misc/itxx_era5_index.nc' da = xr.open_dataarray(ifile, decode_times=False).rename(time='year').assign_coords(year=range(1950,2022)) ts = da ifile = '../misc/igiss_al_gl_m.nc' da = xr.open_dataarray(ifile, decode_times=False) da = da.assign_coords(time=pd.date_range('1880-01', freq='MS', periods=da.size)) da = da.groupby('time.year').mean('time').pipe(lowpass).sel(year=slice(1950,2021)) gmst = da ds = xr.Dataset(dict(eindex=ts, gmst=gmst)) r = gev_shift_fit_bootstrap(ts.values, gmst.values, nmc=100) sys.exit() if len(sys.argv)>2: # two input data files to compare if len(sys.argv)>3: # specify a return period of the first dataset and get its value in the second dataset rp_original = int(sys.argv[3]) else: rp_original = 1000 print('original return period:', rp_original) da0 = xr.open_dataarray(sys.argv[1]) da1 = xr.open_dataarray(sys.argv[2]) fig, ax = plt.subplots() for ii,da in enumerate((da0, da1)): print('dataset', ii) color = f'C{ii}' plot_smp_return_period(da, ax=ax, color=color)# emperical return period #fit by scipy.stats.genextreme c, mu_, sigma_ = gev.fit(da) xi_ = -c print(f'genextreme: {mu_=}; {sigma_=}; {xi_=}') #fit by user defined likelihood function r = gev_fit(da) if r.success: mu, sigma, xi = r.x label = f'GEV fit: $\\mu$={mu:.3f}; $\\sigma$={sigma:.3f}; $\\xi$={xi:.3f}' print(f'wy: {mu=}; {sigma=}; {xi=}') plot_return_period(mu, sigma, xi, upper=da.size*2, ax=ax, ls='-', label=label, color=color) #return period 1000 level if color=='C0': levelc = gev_return_period_inverse(rp_original, mu, sigma, xi) print(f'level of original return period {rp_original}:', levelc) ax.axhline(levelc, color='C0', ls='--') elif color=='C1': rp_new = gev_return_period(levelc, mu, sigma, xi) print('new return period of the same level:', rp_new) print('probability ratio:', rp_original/rp_new) #ax.axvline(rp_new, color='gray', ls='--') else: print(f'{r = }') print('[failed]:', r.message) print() ax.legend() ax.set_xlabel('return period') ax.set_ylabel(da0.name) elif len(sys.argv)>1:#read input data file and do the analysis; otherwise, do the validation ifile = sys.argv[1] da = xr.open_dataarray(ifile) #return period fig,ax = plt.subplots() plot_smp_return_period(da, ax=ax) c, mu_, sigma_ = gev.fit(da) xi_ = -c print(f'genextreme: {mu_=}; {sigma_=}; {xi_=}') #plot_return_period(mu_, sigma_, xi_, upper=5000, ax=ax, label=f'genextreme: $\\mu$={mu_:5.3f}; $\\sigma$={sigma_:5.3f}; $\\xi$={xi_:5.3f}') r = gev_fit(da) if r.success: mu, sigma, xi = r.x label = f'GEV fit: $\\mu$={mu:.3f}; $\\sigma$={sigma:.3f}; $\\xi$={xi:.3f}' print(f'wy: {mu=}; {sigma=}; {xi=}') plot_return_period(mu, sigma, xi, upper=da.size*2, ax=ax, ls='--', label=label) else: print(f'{r = }') print('[failed]:', r.message) ax.legend() ax.set_xlabel('return period') ax.set_ylabel(da.name) #print('return period 1000 value:', gev_return_period_inverse(1000, mu, sigma, xi)) #print('31.944997567038687 return period:', gev_return_period(31.944997567038687 ,mu, sigma, xi)) #exceedance probability fig,ax = plt.subplots() plot_smp_exceedance_prob(da, ax=ax) if r.success: plot_exceedance_prob(mu,sigma,xi, lower=1/(da.size*2), ax=ax, ls='--', label=label) ax.legend() ax.set_xlabel(da.name) ax.set_ylabel('exceedance probability') else: #do the validation #validate user defined cdf, pdf fig,axes = plt.subplots(2, 1, figsize=(6,5)) ax = axes[0] mu, sigma = 0, 1 for xi in (-0.5, 0, 0.5): plot_cdf(mu, sigma, xi, ax=ax, user_defined=False, label=f'{mu=};{sigma=};{xi=}: genextreme') plot_cdf(mu, sigma, xi, ax=ax, user_defined=True, ls='--', label=f'{mu=};{sigma=};{xi=}: WY') ax.set_ylabel('GEV cdf') ax.legend() ax.set_xlim(None,15) ax = axes[1] mu, sigma = 0, 1 for xi in (-0.5, 0, 0.5): plot_pdf(mu, sigma, xi, ax=ax, user_defined=False, label=f'{mu=};{sigma=};{xi=}: genextreme') plot_pdf(mu, sigma, xi, ax=ax, user_defined=True, ls='--', label=f'{mu=};{sigma=};{xi=}: WY') ax.set_ylabel('GEV pdf') ax.legend() ax.set_xlim(None,15) #validate user defined fit ifile = '/tigress/wenchang/analysis/heatwave2021/TXx_FLOR_CTL1990_0101-1000.nc' da = xr.open_dataarray(ifile) fig, axes = plt.subplots(2,1, figsize=(6,5)) #return perio ax = axes[0] plot_smp_return_period(da, ax=ax) c, mu_, sigma_ = gev.fit(da) xi_ = -c print(f'genextreme: {mu_=}; {sigma_=}; {xi_=}') plot_return_period(mu_, sigma_, xi_, upper=5000, ax=ax, label=f'genextreme: $\\mu$={mu_:5.3f}; $\\sigma$={sigma_:5.3f}; $\\xi$={xi_:5.3f}') r = gev_fit(da) if r.success: mu, sigma, xi = r.x print(f'wy: {mu=}; {sigma=}; {xi=}') plot_return_period(mu, sigma, xi, upper=5000, ax=ax, ls='--', label=f'WY: $\\mu$={mu:.3f}; $\\sigma$={sigma:.3f}; $\\xi$={xi:.3f}') else: print(f'{r = }') print('[failed]:', r.message) ax.legend() ax.set_xlabel('return period [yr]') ax.set_ylabel('Txx [$^\circ$C]') #exceedance prob ax = axes[1] plot_smp_exceedance_prob(da, ax=ax) plot_exceedance_prob(mu_, sigma_, xi_, lower=0.001/2, ax=ax, label='genextreme') if r.success: plot_exceedance_prob(mu, sigma, xi, ax=ax, lower=0.001/2, ls='--', label='WY') ax.legend() ax.set_xlabel('Txx [$^\circ$C]') ax.set_ylabel('exceedance probability') #savefig if 'savefig' in sys.argv: figname = __file__.replace('.py', f'.png') wysavefig(figname) tt.check(f'**Done**') plt.show()