Source code for mwr_l12l2.model.ecmwf.interpret_ecmwf

import os

import numpy as np
import pandas as pd
import xarray as xr

from mwr_l12l2.log import logger
from mwr_l12l2.errors import MWRInputError
from mwr_l12l2.utils.file_utils import abs_file_path
from mwr_l12l2.utils.data_utils import datetime64_to_str

[docs]class ModelInterpreter(object): """class to interpret model data from ECMWF and produce input files for TROPoe Args: file_fc_nc: file containing forecast data in NetCDF. Must have been converted from grib to nc with mwr_l12l2/model/ecmwf/grb_to_nc.sh before. Direct read-in of grib doesn't work because of dim of lnsp file_zg_grb: file containing the geopotential of the lowest model level in grib format station_altitude (optional): Station altitude to inter/extrapolate model data to for sfc data. If not specified the lowest model level will be used for surface data. file_ml (optional): a and b parameters to transform model levels to pressure and altitude grid. If not specified the file named 'ecmwf_model_levels_137.csv' in same dir as interpret_ecmwf.py will be used. """ def __init__(self, file_fc_nc, file_zg_grb=None, station_altitude=None, file_ml=None): self.file_fc_nc = abs_file_path(file_fc_nc) # TODO: check if geopotential from forecast is really equivalent to geopotential from analysis and if is does not crash MARS request # if file_zg_grb is not None: # self.use_zg_from_fc = False # self.file_zg_grb = abs_file_path(file_zg_grb) # else: # self.use_zg_from_fc = True self.use_zg_from_fc = False self.file_zg_grb = abs_file_path(file_zg_grb) self.station_altitude = station_altitude self.file_ml = file_ml if self.file_ml is None: self.file_ml = os.path.join(os.path.dirname(__file__), 'ecmwf_model_levels_137.csv') self.fc = None self.zg_surf = None self.p = None self.p_half = None self.z = None self.time_ref = None # time of reference profiles self.z_ref = None # reference geometrical altitude profile (1d) self.p_ref = None # reference pressure profile (1d) self.q_ref = None # reference humidity profile (1d) self.t_ref = None # reference temperature profile (1d) self.q_err = None # standard deviation of humidity profile within lat/lon area (1d) self.t_err = None # standard deviation of humidity profile within lat/lon area (1d)
[docs] def run(self, time_min, time_max): """run for data closest to selected time in :class:`numpy.datetime64` or :class:`datetime.datetime`""" self.load_data(time_min, time_max) self.hybrid_to_p() self.p_to_z() self.compute_stats()
[docs] def load_data(self, time_min, time_max): """load dataset and reduce to the time of interest (to speed up following computations)""" fc_all = xr.open_dataset(self.file_fc_nc) if time_max - time_min > np.timedelta64(1, 'h'): self.fc = fc_all.sel(time=slice(time_min, time_max)) else: self.fc = fc_all.sel(time=slice(time_min, time_min+np.timedelta64(1,'h'))) logger.info('Using forecast data between '+datetime64_to_str(self.fc.time.min().values, '%Y-%m-%d %H:%M:%S')+' and '+datetime64_to_str(self.fc.time.max().values, '%Y-%m-%d %H:%M:%S')) # Now keeping all models runs between time_min and time_max of the mwr observations, TROPoe does the interpolation if not self.use_zg_from_fc: logger.info('Reading geopotential from analysis data') self.zg_surf = xr.open_dataset(self.file_zg_grb, engine='cfgrib')
[docs] def hybrid_to_p(self): """compute pressure (in Pa) of half and full levels from hybrid levels and fill to self.p and self.p_half""" col_headers = ['level', 'a', 'b'] # expected column headers in self.file_ml # get parameters a and b for hybrid model levels from csv and do some basic input checking level_coeffs = pd.read_csv(self.file_ml) for name in col_headers: if name not in level_coeffs: MWRInputError("the file describing the model level coefficients at '{}' is supposed to contain " "a column header '{}' in the first line".format(self.file_ml, name)) if not (level_coeffs.loc[len(level_coeffs)-1, 'level'] == len(level_coeffs)-1 and level_coeffs.loc[0, 'level'] == 0): MWRInputError("the file describing the model level coefficients at '{}' is supposed to contain all levels " 'from 0 to n_levels (plus one line of column headers at the top)'.format(self.file_ml)) # extract a and b parameters for levels of interest (half levels below and above) ab = np.concatenate([level_coeffs.loc[self.fc.level - 1, ['a', 'b']].to_numpy(), np.expand_dims(level_coeffs.loc[self.fc.level[-1].values, ['a', 'b']].to_numpy(), axis=0)]) a = ab[:, 0] b = ab[:, 1] # calculate pressure at model levels taking the mean between half levels p_surf = np.exp(self.fc.lnsp) # to test pseudo std atm (p, not T/q) use p_surf=np.tile(101325,(27, 48, 3, 3)) p_surf_all = np.tile(p_surf[:, 0:1, :, :], (1, ab.shape[0], 1, 1)) # slice instead of index to preserve dim a_all = np.tile(a[np.newaxis, :, np.newaxis, np.newaxis], (p_surf.shape[0], 1, p_surf.shape[2], p_surf.shape[3])) b_all = np.tile(b[np.newaxis, :, np.newaxis, np.newaxis], (p_surf.shape[0], 1, p_surf.shape[2], p_surf.shape[3])) self.p_half = a_all + b_all*p_surf_all self.p = (self.p_half + np.roll(self.p_half, 1, axis=1))[:, 1:, :, :] / 2
[docs] def p_to_z(self): """transform pressure grid (from :meth:`hybrid_to_p`) to geometrical altitudes according to: https://confluence.ecmwf.int/display/CKB/ERA5%3A+compute+pressure+and+geopotential+on+model+levels%2C+geopotential+height+and+geometric+height """ # noqa: E501 gas_const = 287.06 # gas constant for dry air (Rd) g = 9.80665 # gravitational acceleration of Earth # TODO: check whole function with hypsometric equation and possibly re-write from scratch using it (clearer) # TODO: get rid of RuntimeWarning: divide by zero encountered in log dp = (self.p_half - np.roll(self.p_half, 1, axis=1))[:, 1:, :, :] dlogp = np.log(self.p_half / np.roll(self.p_half, 1, axis=1))[:, 1:, :, :] alpha = 1 - (self.p_half[:, 1:, :, :] / dp) * dlogp # correct for uppermost level dlogp[:, 0, :, :] = np.log(self.p_half[:, 1, :, :] / 0.1) alpha[:, 0, :, :] = np.tile(-np.log(2), (self.p_half.shape[0], self.p_half.shape[2], self.p_half.shape[3])) # transformation to geopotential height zg dzg_half = self.virt_temp() * gas_const * dlogp # diff between geopotential height half levels if not self.use_zg_from_fc: zg_surf_all = np.tile(self.zg_surf.z.values,(self.p_half.shape[0], 1, 1, 1)) #self.zg_surf.z.values[np.newaxis, np.newaxis, :, :] else: # New version to work with geopotential from the fc directly: zg_surf_all = np.expand_dims(self.fc.z.isel(level=0).data,1) dzg_half_with_sfc = np.concatenate((dzg_half, zg_surf_all), axis=1) zg_half = np.flip(np.cumsum(np.flip(dzg_half_with_sfc, axis=1), axis=1), axis=1) # integrate from surface zg = zg_half[:, 1:, :, :] - alpha*gas_const*self.virt_temp() # transformation to geometrical height z self.z = zg / g
[docs] def compute_stats(self): """take reference profiles and uncertainty""" # TODO: make this method more generic # take profiles at centre of lat/lon (at last time selected) as reference self.z_ref = get_ref_profile(self.z) self.q_ref = get_ref_profile(self.fc.q) self.t_ref = get_ref_profile(self.fc.t) self.p_ref = get_ref_profile(self.p) self.time_ref = self.fc.time.values # Compute reference profile of RH self.rh = self.relative_humidity() # take std over lat/lon (at last time selected) as error self.q_err = get_std_profile(self.fc.q) self.t_err = get_std_profile(self.fc.t)
[docs] def virt_temp(self): """return virtual temperature from temperature and specific humidity in self.fc""" return self.fc.t * (1 + 0.609133*self.fc.q)
[docs] def relative_humidity(self): """return relative humidity from pressure, specific humitiy and temperature in self.rh according to https://codes.ecmwf.int/grib/param-db/?id=157""" # Mixed phased parameter (depends on T): alpha = ((self.t_ref - 250.16)/(273.16-250.16))**2 alpha[self.t_ref<250.16] = 0 alpha[self.t_ref>273.16] = 1 # Saturation vapor pressure esat_w = 611.21*np.exp(17.502*((self.t_ref - 273.16) / (self.t_ref -32.19))) esat_i = 611.21*np.exp(22.587*((self.t_ref - 273.16) / (self.t_ref +0.7))) esat = alpha*esat_w + (1-alpha)*esat_i # Ratio between molar masses of water and dry air epsilon = 0.621981 # relative humity return self.p_ref*self.q_ref*(1/epsilon)/(esat*(1 + self.q_ref*(1/epsilon - 1)))
[docs]def get_ref_profile(x): """extract ref profile (last time, centre lat/lon) from a :class:`xarray.DataArray` with dim (time,level,lat,lon)""" if type(x) is not np.ndarray: x = x.values return x[:, :, int(x.shape[-2]/2), int(x.shape[-1]/2)] # CARE: if you edit, also edit central_lat/lon in writer
[docs]def get_std_profile(x): """extract std profile (last time, std in lat/lon) from a :class:`xarray.DataArray` with dim (time,level,lat,lon)""" # flatten lat and lon so that we can take std over all profiles in lat/lon box #x_flat = x.values[-1, :, :, :, ].reshape((-1, x.shape[-2] * x.shape[-1])) #return np.std(x_flat, axis=1) # TODO: Check if we should include the time dimension to get more realistic std dev ? return x.std(dim=['latitude','longitude']).data
# the following trying to interpolate q and t to same altitude grid using scipy's failed with all NaN # from scipy.interpolate import griddata (possibly only because z-grid was not monotonic at this time) # z_flat = self.z.values[-1, :, :, :, ].reshape((-1, self.z.values.shape[-2] * self.z.values.shape[-1])) # q_interp = griddata(z_flat[:-1, :], q_flat[:-1, :], np.tile(self.z_ref.values[:-1,np.newaxis], # (1, self.fc.t.values.shape[-2] * self.fc.t.values.shape[-1]))) if __name__ == '__main__': import datetime as dt model = ModelInterpreter('mwr_l12l2/data/ecmwf_fc/ecmwf_fc_0-20000-0-10393_A_202304250000_converted_to.nc', 'mwr_l12l2/data/ecmwf_fc/ecmwf_z_0-20000-0-10393_A.grb', 'mwr_l12l2/data/output/ecmwf/model_stats_0-20000-0-10393_A_202304251500.nc', 'mwr_l12l2/data/output/ecmwf/model_sfc_0-20000-0-10393_A_202304251500.nc', 127) model.run(dt.datetime(2023, 4, 25, 15, 0, 0)) pass