Source code for mwr_l12l2.utils.file_utils

import os

import datetime as dt
import numpy as np

from pathlib import Path

import mwr_l12l2
from mwr_l12l2.errors import FilenameError, MWRInputError


[docs]def abs_file_path(*file_path): """ Make a relative file_path absolute in respect to the mwr_l12l2 project directory. Absolute paths wil not be changed """ path = Path(*file_path) if path.is_absolute(): return path return Path(mwr_l12l2.__file__).parent.parent / path
[docs]def replace_path(path, part_to_replace, replace_by): """replace parts of a path with another path (e.g. useful when mounting a path to another location)""" rel_path = os.path.relpath(path, part_to_replace) return os.path.join(replace_by, rel_path)
[docs]def concat_filename(prefix, wigos, inst_id='', suffix='', ext='.nc'): """concatenate a filename according to E-PROFILE standards. Args: prefix: prefix of filename (including tailing _ if needed) wigos: WIGOS-ID (or any other ID) of the station inst_id: instrument ID. Will be appended to wigos using _ if not empty. Defaults to ''. suffix: suffix part after the station and instrument ids (incl. heading _ if needed). Defaults to ''. ext: extension. Defaults to '.nc'. Explicitly specify ext ='' for no extension """ fn = prefix + wigos if inst_id: fn += '_' fn += inst_id fn += suffix fn += ext return fn
[docs]def datestr_from_filename(filename, suffix=''): """return date string from filename, assuming it to be the last date-like block (separated by _) before suffix + ext Accepted dates are in form 'yyyymmddHHMM', 'yyyymmddHHMMSS', 'yyyymmdd', 'yymm' etc. but not separated by -, _ or : Args: filename: filename as str. Can contain path and extension. suffix (optional): suffix of the filename coming after the date and before the extension. Defaults to ''; Returns: string containing the date in same representation as in the filename """ min_date_length = 4 filename_cut = os.path.splitext(filename)[0] if len(suffix) > 0: filename_cut = filename_cut[0:-len(suffix)] # zero length suffix would yield empty str, hence if-clause fn_parts = filename_cut.split('_') for block in reversed(fn_parts): # try to find date str parts of filename, starting at the end if len(block) < min_date_length: continue if block.isdecimal(): return block if block[1:].isdecimal() and len(block)-1 >= min_date_length: # block has ID at start, e.g. _A202206010000 return block[1:] if suffix: raise FilenameError("found no date in '{}' while ignoring suffix '{}'".format(filename, suffix)) else: raise FilenameError("found no date in '{}'".format(filename))
[docs]def datetime64_from_filename(filename, *args, **kwargs): """get :class:`numpy.datetime64` object from filename. Calling as :func:`datestr_from_fielename`""" accepted_formats = {'yyyymmddHHMMSS': '%Y%m%d%H%M%S', # matching between datestring formats and datetime format 'yyyymmddHHMM': '%Y%m%d%H%M', 'yyyymmddHH': '%Y%m%d%H', 'yyyymmdd': '%Y%m%d', 'yyyymm': '%Y%m'} # only one entry per length of key as right one is picked by length dstr = datestr_from_filename(filename, *args, **kwargs) for dstr_format, datetime_format in accepted_formats.items(): if len(dstr) == len(dstr_format): return np.datetime64(dt.datetime.strptime(dstr, datetime_format)) raise FilenameError('length of date string in filename is {} what does not match any ' 'of the accepted formats ({})'.format(len(dstr), accepted_formats))
[docs]def generate_output_filename(basename, timestamp_src, files_in=None, time=None, ext='nc'): """generate filename in form {basename}{timestamp}.{ext} where timestamp comes from input files or time vector Args: basename: the first part of the filename without the date timestamp_src: source of output file timestamp. Can be 'instamp_min'/'instamp_max' for using smallest/largest timestamp of input filenames (needs 'files_in) or 'time_min'/'time_max' for smallest/largest time in data in format yyyymmddHHMM (needs 'time'). files_in: list of input filenames to processing as a basis for timestamp selection time: :class:`xarray.DataArray` time vector of the data in :class:`numpy.datetime64` format. Assume to be sorted ext (optional): filename extension. Defaults to 'nc'. Empty not permitted. """ format_stamp = '%Y%m%d%H%M' # only used for timestamp_src='time_min' or 'time_max' # handle input if timestamp_src in ['instamp_min', 'instamp_max']: if files_in is None: raise MWRInputError("if timestamp_src is 'instamp_min' or 'instamp_max' input 'files_in' must be given") timestamps = sorted([datestr_from_filename(f) for f in files_in], key=timestamp_to_float) elif timestamp_src in ['time_min', 'time_max']: if time is None: raise MWRInputError("if timestamp_src is 'time_min' or 'time_max' input 'time' must be given") # produce output timestamp if timestamp_src == 'instamp_min': timestamp = timestamps[0] elif timestamp_src == 'instamp_max': timestamp = timestamps[-1] elif timestamp_src == 'time_min': timestamp = time[0].dt.strftime(format_stamp).data elif timestamp_src == 'time_max': timestamp = time[-1].dt.strftime(format_stamp).data elif timestamp_src == 'time_mean': timestamp = np.mean(time).dt.strftime(format_stamp).data else: raise MWRInputError("Known values for 'timestamp_src' are {} but found '{}'".format( "['instamp_min', 'instamp_max', 'time_min', 'time_max']", timestamp_src)) return '{}{}.{}'.format(basename, timestamp, ext)
[docs]def timestamp_to_float(timestamp): """transform timestamp string to a float between 0 and 1 (integer of timestamp normalised by its length)""" return int(timestamp)/10**len(timestamp)
[docs]def dict_to_file(data, file, sep, header=None, remove_brackets=False, remove_parentheses=False, remove_braces=False): """write dictionary contents to a file. One item per line matching keys and values using 'sep'. Args: data: dictionary to write to file in question. Numpy 1d-arrays as values are ok, matrices not file: output file incl. path and extension sep: separator sign between key and value as string. Can include whitespaces around separator. header: header string to write to the head of the file before the first dictionary item. Defaults to None remove_brackets (optional): Remove square brackets [ and ], e.g. from lists, while printing to file. Defaults to False remove_parentheses (optional): Remove parentheses ( and ), e.g. from tuples, while printing to file. Defaults to False remove_braces (optional): Remove curly braces { and } while printing to file. Defaults to False """ with open(file, 'w') as f: if header is not None: f.write(header + '\n') for key, val in data.items(): if isinstance(val, np.ndarray): # ensure numpy arrays are printed with elements separated by commas val = list(val) val = str(val) if remove_brackets: val = val.replace('[', '').replace(']', '') if remove_parentheses: val = val.replace('(', '').replace(')', '') if remove_braces: val = val.replace('{', '').replace('}', '') f.write(sep.join([key, val]) + '\n')
if __name__ == '__main__': fn1 = concat_filename('L2_', '0-20000-0-06610') fn2 = concat_filename('L2_', '0-20000-0-06610', ext='.grb') fn3 = concat_filename('L2_', '0-20000-0-06610', 'A') fn4 = concat_filename('L2_', '0-20000-0-06610', 'A', '_testsuffix') fn5 = concat_filename('L2_', '0-20000-0-06610', suffix='*') fn6 = concat_filename('L2_', '0-20000-0-06610', suffix='*', ext='') dstr1 = datestr_from_filename('L2_0-20000-0-06610_A_20230828_20311201.nc', '20311201') # 20230828 dstr2 = datestr_from_filename('L2_0-20000-0-06610_A_20230828_20311201.nc') # 20311201 # the following shall raise: dstr3 = datestr_from_filename('L2_0-20000-0-06610_A.nc') dd = {'key1': 'val1', 'key2': 'val2'} dict_to_file(dd, 'test.vip', ' = ', '# this is just a test file') replace_path('tropoe/node_0/mwr/mwr.nc', 'tropoe/node_0', '/data') pass