Source code for bluemath_tk.teslakit.rbf

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# common
import time

# pip
import numpy as np
from scipy.optimize import fminbound
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error
import xarray as xr

# tk
from .mda import Normalize

# RBF Phi functions
[docs] def rbfphi_linear(r, const): return r
[docs] def rbfphi_cubic(r, const): return r*r*r
[docs] def rbfphi_gaussian(r, const): return np.exp(-0.5*r*r/(const*const))
[docs] def rbfphi_multiquadratic(r, const): return np.sqrt(1+r*r/(const*const))
[docs] def rbfphi_thinplate(r, const): return r*r*log(r+1)
[docs] def RBF_Assemble(x, phi, const, smooth): dim, n = x.shape A = np.zeros((n, n)) for i in range(n): for j in range(i+1): r = np.linalg.norm(x[:,i]-x[:,j]) temp = phi(r, const) A[i,j] = temp A[j,i] = temp A[i,i] = A[i,i] - smooth # polynomial part P = np.hstack((np.ones((n,1)), x.T)) A = np.vstack( (np.hstack((A,P)), np.hstack((P.T, np.zeros((dim+1,dim+1))))) ) return A
[docs] def CostEps(ep, x, y): # rbf coeff and A matrix calculation rbf_coeff, A = CalcRBF_Coeff(ep, x, y) # rbf cost calculation m, n = x.shape A = A[:n,:n] invA = np.linalg.pinv(A) m1, n1 = rbf_coeff.shape kk = y - rbf_coeff[m1-m-1] for i in range(m): kk = kk - rbf_coeff[m1-m+i] * x[i,:] ceps = np.dot(invA, kk) / np.diagonal(invA) yy = np.linalg.norm(ceps) return yy
[docs] def CalcRBF_Coeff(ep, x, y): # rbf coeff calculation m, n = x.shape A = RBF_Assemble(x, rbfphi_gaussian, ep, 0) b = np.concatenate((y, np.zeros((m+1,)))).reshape(-1,1) rbfcoeff, _, _ ,_ = np.linalg.lstsq(A, b, rcond=None) # inverse return rbfcoeff, A
[docs] def RBF_Interpolation(rbf_constant, rbf_coeff, nodes, x): phi = rbfphi_gaussian # gaussian RBFs rbf_coeff = rbf_coeff.flatten() dim, n = nodes.shape dim_p, n_p = x.shape f = np.zeros(n_p) r = np.zeros(n) for i in range(n_p): r = np.linalg.norm( np.repeat([x[:,i]], n, axis=0)-nodes.T, axis=1 ) s = rbf_coeff[n] + np.sum(rbf_coeff[:n] * phi(r, rbf_constant)) # linear part for k in range(dim): s = s + rbf_coeff[k+n+1] * x[k,i] f[i] = s return f
[docs] def RBF_Reconstruction( subset, ix_scalar_subset, ix_directional_subset, target, ix_scalar_target, ix_directional_target, dataset): ''' Radial Basis Function (Gaussian) interpolator. subset - subset used for fitting RBF (dim_input) ix_scalar_subset - scalar columns indexes for subset ix_directional_subset - directional columns indexes for subset target - target used for fitting RBF (dim_output) ix_scalar_target - scalar columns indexes for target ix_directional_target - directional columns indexes for target dataset - dataset used for RBF interpolation (dim_input) ''' # parameters sigma_min = 0.1 sigma_max = 0.7 # normalize subset and dataset dataset_norm, mins, maxs = Normalize( dataset, ix_scalar_subset, ix_directional_subset) subset_norm, _, _ = Normalize( subset, ix_scalar_subset, ix_directional_subset, mins, maxs) # output storage output = np.zeros((dataset.shape[0], target.shape[1] )) # RBF scalar variables for ix in ix_scalar_target: v = target[:,ix] # minimize RBF cost function t0 = time.time() # time counter opt_sigma = fminbound( CostEps, sigma_min, sigma_max, args=(subset_norm.T, v) ) t1 = time.time() # optimization time # calculate RBF coeff rbf_coeff, _ = CalcRBF_Coeff(opt_sigma, subset_norm.T, v) # RBF interpolation t2 = time.time() # time counter output[:, ix] = RBF_Interpolation( opt_sigma, rbf_coeff, subset_norm.T, dataset_norm.T) t3 = time.time() # interpolation time print( 'ix_scalar: {0}, optimization: {1:.2f} | interpolation: {2:.2f}'.format( ix, t1-t0, t3-t2) ) # RBF directional variables for ix in ix_directional_target: v = target[:,ix] # x and y directional variable components vdg = np.pi/2 - v * np.pi/180 pos = np.where(vdg < -np.pi)[0] vdg[pos] = vdg[pos] + 2 * np.pi vdx = np.cos(vdg) vdy = np.sin(vdg) # minimize RBF cost function t0 = time.time() # time counter opt_sigma_x = fminbound( CostEps, sigma_min, sigma_max, args=(subset_norm.T, vdx) ) opt_sigma_y = fminbound( CostEps, sigma_min, sigma_max, args=(subset_norm.T, vdy) ) t1 = time.time() # optimization time # calculate RBF coeff rbf_coeff_x, _ = CalcRBF_Coeff(opt_sigma_x, subset_norm.T, vdx) rbf_coeff_y, _ = CalcRBF_Coeff(opt_sigma_y, subset_norm.T, vdy) # RBF interpolation t2 = time.time() # time counter output_x = RBF_Interpolation( opt_sigma_x, rbf_coeff_x, subset_norm.T, dataset_norm.T) output_y = RBF_Interpolation( opt_sigma_y, rbf_coeff_y, subset_norm.T, dataset_norm.T) t3 = time.time() # interpolation time # join x and y components out = np.arctan2(output_y, output_x) * 180/np.pi out = 90 - out pos = np.where(out < 0)[0] out[pos] = out[pos] + 360 output[:,ix] = out print( 'ix_directional: {0}, optimization: {1:.2f} | interpolation: {2:.2f}'.format( ix, t1-t0, t3-t2) ) return output
[docs] def RBF_Validation( subset, ix_scalar_subset, ix_directional_subset, target, ix_scalar_target, ix_directional_target, n_splits=3, shuffle=False): ''' Radial Basis Function (Gaussian) k-fold mean squared error subset - subset used for fitting RBF (dim_input) ix_scalar_subset - scalar columns indexes for subset ix_directional_subset - directional columns indexes for subset target - target used for fitting RBF (dim_output) ix_scalar_target - scalar columns indexes for target ix_directional_target - directional columns indexes for target ''' # get train-test combinations using kfold from sklearn kF = KFold(n_splits=n_splits, shuffle=shuffle, random_state=None) l_mse = [] l_trn_ix = [] l_tst_ix = [] for c, (train_index, test_index) in enumerate(kF.split(subset)): print('RBFs Kfold Validation: {0}/{1}'.format(c+1, n_splits)) # get train and test data X_train, X_test = subset[train_index], subset[test_index] Y_train, Y_test = target[train_index], target[test_index] # fit RBFs with train data and interpolate test data Y_rbf = RBF_Reconstruction( X_train, ix_scalar_subset, ix_directional_subset, Y_train, ix_scalar_target, ix_directional_target, X_test) # calculate mean squared error mse = mean_squared_error(Y_test, Y_rbf) print('mean squared error : {0}'.format(mse)) print() # store data for output l_mse.append(mse) l_trn_ix.append(train_index) l_tst_ix.append(test_index) # return validation data sx = np.max([len(r) for r in l_trn_ix]) sy = len(l_trn_ix) np_train = np.zeros((sx, sy)) * np.nan for c, r in enumerate(l_trn_ix): np_train[:len(r), c] = r sx = np.max([len(r) for r in l_tst_ix]) sy = len(l_tst_ix) np_test = np.zeros((sx, sy)) * np.nan for c, r in enumerate(l_tst_ix): np_test[:len(r), c] = r return xr.Dataset( { 'mean_squared_error': (('n_split',), l_mse), 'train_index': (('train', 'n_split',), np_train), 'test_index': (('test', 'n_split',), np_test), }, coords={ 'n_split': np.arange(n_splits), } )