#!/usr/bin/env python
# -*- coding: utf-8 -*-
from datetime import datetime, timedelta, date
from dateutil.relativedelta import relativedelta
from cftime._cftime import DatetimeGregorian
import calendar
import numpy as np
import pandas as pd
import xarray as xr
# TODO replace xds2datetime with date2datenum (needs new date type switch)
# TODO make a fast_reindex daily option
# TODO optimize, refactor, and tests
# MATLAB TIMES
[docs]
def datematlab2datetime(datenum_matlab):
'Return python datetime for matlab datenum. Transform and adjust from matlab.'
d = datetime.fromordinal(int(datenum_matlab)) + \
timedelta(days=float(datenum_matlab % 1)) - \
timedelta(days=366) + timedelta(microseconds=0)
return d
[docs]
def datevec2datetime(d_vec):
'''
Returns datetime list from a datevec matrix
d_vec = [[y1 m1 d1 H1 M1],[y2 ,2 d2 H2 M2],..]
'''
return [datetime(d[0], d[1], d[2], d[3], d[4]) for d in d_vec]
[docs]
def DateConverter_Mat2Py(datearray_matlab):
'Parses matlab datenum array to python datetime list'
return [datematlab2datetime(x) for x in datearray_matlab]
# PYTHON TIMES
[docs]
def xds2datetime(d64):
'converts xr.Dataset np.datetime64[ns] into datetime'
# TODO: hour minutes and seconds
return datetime(d64.dt.year, d64.dt.month, d64.dt.day)
[docs]
def npdt64todatetime(dt64):
'converts np.datetime64[ns] into datetime'
ts = (dt64 - np.datetime64('1970-01-01T00:00:00Z')) / np.timedelta64(1, 's')
return datetime(1970, 1, 1) + timedelta(seconds=ts)
# declare aux function for generating datetime.datetime array
[docs]
def generate_datetimes(t0, t1, dtype='datetime64[h]'):
# lambda vectorized: datetime.datetime from utc timestamp
lb_dfts = lambda t: datetime.utcfromtimestamp(t)
np_dfts = np.vectorize(lb_dfts)
# mount times array
dtd = {
'datetime64[h]': timedelta(hours=1),
'datetime64[D]': timedelta(days=1),
}
tdd = dtd[dtype]
tg = np.arange(t0, t1 + tdd, dtype=dtype)
tg = (tg - np.datetime64('1970-01-01T00:00:00Z')) / np.timedelta64(1, 's')
tg = np_dfts(tg)
return tg
[docs]
def fast_reindex_hourly(xds_data):
'''
Fast and secure method to reindex (pad) xarray.Dataset to hourly data
xds_data - xarray.Dataset with time coordinate
'''
# def aux function for getting timedeltas as int array
def get_deltas(t):
t_df = (np.diff(t))
t_tp = type(t_df[0])
# total number of hours each date interval
if t_tp == np.timedelta64:
d = t_df.astype('timedelta64[h]') / np.timedelta64(1,'h')
d = d.astype(int)
else:
# lambda vectorized: datetime.timedelta to number of hours
lb_ghs = lambda t: t.days*24
np_ghs = np.vectorize(lb_ghs)
d = np_ghs(t_df)
return d
# generate output time array
time_base = xds_data.time.values[:]
t0, t1 = date2datenum(time_base[0]), date2datenum(time_base[-1])
time_h = generate_datetimes(t0, t1, dtype='datetime64[h]')
# repeat data in each variable new lower time frame (pad)
ds = get_deltas(time_base)
dv = {}
for vn in xds_data.keys():
b = xds_data[vn].values[:]
p = np.append(np.repeat(b[:-1], ds), b[-1])
dv[vn] = (('time',), p)
# return xarray.Dataset
xds_out = xr.Dataset(dv, coords={'time': time_h})
return xds_out
[docs]
def fast_reindex_hourly_nsim(xds_data):
'''
Fast and secure method to reindex (pad) xarray.Dataset to hourly data
xds_data - xarray.Dataset with time, n_sim coordinates
'''
# def aux function for getting timedeltas as int array
def get_deltas(t):
t_df = (np.diff(t))
t_tp = type(t_df[0])
# total number of hours each date interval
if t_tp == np.timedelta64:
d = t_df.astype('timedelta64[h]') / np.timedelta64(1,'h')
d = d.astype(int)
else:
# lambda vectorized: datetime.timedelta to number of hours
lb_ghs = lambda t: t.days*24
np_ghs = np.vectorize(lb_ghs)
d = np_ghs(t_df)
return d
# generate output time array
time_base = xds_data.time.values[:]
t0, t1 = date2datenum(time_base[0]), date2datenum(time_base[-1])
time_h = generate_datetimes(t0, t1, dtype='datetime64[h]')
# repeat data in each variable new lower time frame (pad)
ds = get_deltas(time_base)
# iterate n_sim dimension
l_sim = []
for s in xds_data.n_sim:
xds_d = xds_data.sel(n_sim=s)
# fast reindex
dv = {}
for vn in xds_d.keys():
b = xds_d[vn].values[:]
p = np.append(np.repeat(b[:-1], ds), b[-1])
dv[vn] = (('time',), p)
# return xarray.Dataset
xds_rd = xr.Dataset(dv, coords={'time': time_h})
l_sim.append(xds_rd)
# concat simulations
xds_out = xr.concat(l_sim, 'n_sim')
return xds_out
[docs]
def xds_reindex_daily(xds_data, dt_lim1=None, dt_lim2=None):
'''
Reindex xarray.Dataset to daily data between optional limits
'''
# TODO: remove limits from inside function
# TODO: remove this swich and use date2datenum
if isinstance(xds_data.time.values[0], datetime):
xds_dt1 = xds_data.time.values[0]
xds_dt2 = xds_data.time.values[-1]
else:
# parse xds times to python datetime
xds_dt1 = xds2datetime(xds_data.time[0])
xds_dt2 = xds2datetime(xds_data.time[-1])
# cut data at limits
if dt_lim1:
xds_dt1 = max(xds_dt1, dt_lim1)
if dt_lim2:
xds_dt2 = min(xds_dt2, dt_lim2)
# number of days
num_days = (xds_dt2-xds_dt1).days+1
# reindex xarray.Dataset
return xds_data.reindex(
{'time': [xds_dt1 + timedelta(days=i) for i in range(num_days)]},
method = 'pad',
)
[docs]
def xds_reindex_monthly(xds_data):
'''
Reindex xarray.Dataset to monthly data
'''
# TODO: remove this swich and use date2datenum
if isinstance(xds_data.time.values[0], datetime):
xds_dt1 = xds_data.time.values[0]
xds_dt2 = xds_data.time.values[-1]
else:
# parse xds times to python datetime
xds_dt1 = xds2datetime(xds_data.time[0])
xds_dt2 = xds2datetime(xds_data.time[-1])
# number of months
num_months = (xds_dt2.year - xds_dt1.year)*12 + \
xds_dt2.month - xds_dt1.month
# reindex xarray.Dataset
return xds_data.reindex(
{'time': [xds_dt1 + relativedelta(months=i) for i in range(num_months)]},
method = 'pad',
)
[docs]
def xds_common_dates_daily(xds_list):
'''
returns daily datetime array between a list of xarray.Dataset comon date
limits
'''
d1, d2 = xds_limit_dates(xds_list)
return [d1 + timedelta(days=i) for i in range((d2-d1).days+1)]
[docs]
def xds_limit_dates(xds_list):
'''
returns datetime common limits between a list of xarray.Dataset
'''
d1 = None
d2 = None
for xds_e in xds_list:
# TODO: remove this swich and use date2datenum
if isinstance(xds_e.time.values[0], datetime):
xds_e_dt1 = xds_e.time.values[0]
xds_e_dt2 = xds_e.time.values[-1]
else:
# parse xds times to python datetime
xds_e_dt1 = xds2datetime(xds_e.time[0])
xds_e_dt2 = xds2datetime(xds_e.time[-1])
if d1 == None:
d1 = xds_e_dt1
d2 = xds_e_dt2
d1 = max(xds_e_dt1, d1)
d2 = min(xds_e_dt2, d2)
return d1, d2
[docs]
def xds_further_dates(xds_list):
'''
returns datetime further date limits between a list of xarray.Dataset
'''
d1 = None
d2 = None
for xds_e in xds_list:
# TODO: remove this swich and use date2datenum
if isinstance(xds_e.time.values[0], datetime):
xds_e_dt1 = xds_e.time.values[0]
xds_e_dt2 = xds_e.time.values[-1]
else:
# parse xds times to python datetime
xds_e_dt1 = xds2datetime(xds_e.time[0])
xds_e_dt2 = xds2datetime(xds_e.time[-1])
if d1 == None:
d1 = xds_e_dt1
d2 = xds_e_dt2
d1 = min(xds_e_dt1, d1)
d2 = max(xds_e_dt2, d2)
return d1, d2
[docs]
def date2yearfrac(d):
'Returns date d in fraction of the year'
# get timetuple obj
if isinstance(d, datetime):
ttup = d.timetuple()
elif isinstance(d, np.datetime64):
ttup = npdt64todatetime(d).timetuple()
elif isinstance(d, DatetimeGregorian):
ttup = d.timetuple()
# total number of days in year
year_ndays = 366.0 if calendar.isleap(ttup.tm_year) else 365.0
return ttup.tm_yday / year_ndays
[docs]
def date2datenum(d):
'Returns date d (any format) in datetime'
# TODO: new type switch for xarray.core numpy.datetime64
# TODO: rename to date2datetime
if isinstance(d, datetime):
return d
# else get timetuple
elif isinstance(d, np.datetime64):
ttup = npdt64todatetime(d).timetuple()
elif isinstance(d, DatetimeGregorian):
ttup = d.timetuple()
# return datetime
return datetime(*ttup[:6])
[docs]
def get_years_months_days(time):
'''
Returns years, months, days of time in separete lists
(Used to avoid problems with dates type)
'''
t0 = time[0]
if isinstance(t0, (date, datetime, DatetimeGregorian)):
ys = np.asarray([x.year for x in time])
ms = np.asarray([x.month for x in time])
ds = np.asarray([x.day for x in time])
else:
tpd = pd.DatetimeIndex(time)
ys = tpd.year
ms = tpd.month
ds = tpd.day
return ys, ms, ds
# aux. functions for teslakit hourly output
[docs]
def repair_times_hourly(xds):
'ensures that xarray.Dataset time index is rounded to nearest hour and does not repeat values'
# round times to hour (only for np.datetime64)
if isinstance(xds.time.values[0], np.datetime64):
xds['time'] = xds['time'].dt.round('H')
# remove duplicates
_, ix = np.unique(xds['time'], return_index=True); xds = xds.isel(time=ix)
return xds
[docs]
def hours_since(base_date, target_dates):
'fast method for locating "target_dates" hours since "base_date"'
return (target_dates - base_date).astype('timedelta64[h]').astype(int)
[docs]
def add_max_storms_mask(xds, times_max_storms, name_mask='max_storms'):
'fast method for adding a "max_storms" mask to a hourly xarray.Dataset'
# find max storm indexes and make boolean mask
ixs = hours_since(xds.time.values[0], times_max_storms)
np_mask = np.zeros(len(xds.time.values), dtype='bool'); np_mask[ixs] = True
# add mask to dataset
xds[name_mask] = (('time'), np_mask)
return xds