""" Description ----------- Script to process metadata for provision to the Climate Futures project. This processes the monthly output from NorCPM1 forecasts and hindcasts. Author ------ Tarkan A Bilge. """ # ============================================================================ # Imports # ============================================================================ import sys import os import calendar import glob from netCDF4 import Dataset import numpy as np import xesmf as xe import cftime import datetime from dateutil.relativedelta import relativedelta import dask.array as da from functools import partial from multiprocessing import Pool import time # ============================================================================ # Input # ============================================================================ NCPU = 1 # Recommended to run with this as 1 at the moment. COMPRESSION = 4 # Level of output file compression. TRIM = True # Trim off first half month. VARIABLE_LIST = ['TREFHT', 'PRECT', 'PSL', 'SST', 'PRECS', 'U850', 'V850', 'U10', 'UAS', 'VAS'] # also takes argument "all". GRID_FILE = '/projects/NS9873K/norcpm/validation/regrid/conservative_96x144_180x360.nc' OVERWRITE_PREVIOUS = True # False if you are generating hindcasts and it crashes mid-way. # Input files. args = sys.argv if len(args) < 3: print('Usage: python h0_atm.py ') print() print('Example: python h0_atm.py norcpm-cf-system2 20240215') print() exit() SYSTEM = args[1] if SYSTEM == "norcpm-cf-system1": SYSTEM_ID = '1' elif SYSTEM == "norcpm-cf-system2": SYSTEM_ID = '2' else: print('No predefined SYSTEM_ID for system ',SYSTEM) exit() START_DATE = args[2] INGLOB = '/projects/NS9873K/norcpm/raw/{:s}/{:s}_hindcast/{:s}_hindcast_{:s}/{:s}_hindcast_{:s}_mem01-60'.format(SYSTEM,SYSTEM,SYSTEM,START_DATE,SYSTEM,START_DATE) # Output files. OUTPUT_DIR = '/projects/NS9873K/www/norcpm/forecast' # Parameter metadata in the form: # input_name : (parameter_number, long_name, units, variable_file_name, # output_name) PARAMETER_DICT = {'TREFHT' : ('167', '2 metre temperature', 'K', '2m_temperature', 't2m'), 'PSL' : ('151', 'Mean sea level pressure', 'Pa', 'mean_sea_level_pressure', 'msl'), 'PRECS' : ('128', 'Mean total snowfall rate', 'm of water equivalent s**-1', 'snowfall', 'mtsfr'), 'U10' : ('207', '10 metre wind speed', 'm s**-1', '10m_wind_speed', 'si10'), 'UAS' : ('165', '10 metre U wind component', 'm s**-1', '10m_u_component_of_wind', 'u10'), 'VAS' : ('166', '10 metre V wind component', 'm s**-1', '10m_v_component_of_wind', 'v10'), 'PRECT' : ('228', 'Mean total precipitation rate', 'm s**-1', 'total_precipitation', 'tprate'), 'SST' : ('34', 'Sea surface temperature', 'K', 'sea_surface_temperature', 'sst'), 'U010': ('999', 'U velocity at 010hPa', 'm s**-1', 'u_component_of_wind_010hPa', 'U010_GDS0_ISBL'), 'U030': ('999', 'U velocity at 030hPa', 'm s**-1', 'u_component_of_wind_030hPa', 'U030_GDS0_ISBL'), 'U050': ('999', 'U velocity at 050hPa', 'm s**-1', 'u_component_of_wind_050hPa', 'U050_GDS0_ISBL'), 'U100': ('999', 'U velocity at 100hPa', 'm s**-1', 'u_component_of_wind_100hPa', 'U100_GDS0_ISBL'), 'U200': ('999', 'U velocity at 200hPa', 'm s**-1', 'u_component_of_wind_200hPa', 'U200_GDS0_ISBL'), 'U300': ('999', 'U velocity at 300hPa', 'm s**-1', 'u_component_of_wind_300hPa', 'U300_GDS0_ISBL'), 'U400': ('999', 'U velocity at 400hPa', 'm s**-1', 'u_component_of_wind_400hPa', 'U400_GDS0_ISBL'), 'U500': ('999', 'U velocity at 500hPa', 'm s**-1', 'u_component_of_wind_500hPa', 'U500_GDS0_ISBL'), 'U700': ('999', 'U velocity at 700hPa', 'm s**-1', 'u_component_of_wind_700hPa', 'U700_GDS0_ISBL'), 'U850': ('999', 'U velocity at 850hPa', 'm s**-1', 'u_component_of_wind_850hPa', 'U850_GDS0_ISBL'), 'U925': ('999', 'U velocity at 925hPa', 'm s**-1', 'u_component_of_wind_925_hPa', 'U925_GDS0_ISBL'), 'V010': ('999', 'V velocity at 010hPa', 'm s**-1', 'v_component_of_wind_010hPa', 'V010_GDS0_ISBL'), 'V030': ('999', 'V velocity at 030hPa', 'm s**-1', 'v_component_of_wind_030hPa', 'V030_GDS0_ISBL'), 'V050': ('999', 'V velocity at 050hPa', 'm s**-1', 'v_component_of_wind_050hPa', 'V050_GDS0_ISBL'), 'V100': ('999', 'V velocity at 100hPa', 'm s**-1', 'v_component_of_wind_100hPa', 'V100_GDS0_ISBL'), 'V200': ('999', 'V velocity at 200hPa', 'm s**-1', 'v_component_of_wind_200hPa', 'V200_GDS0_ISBL'), 'V300': ('999', 'V velocity at 300hPa', 'm s**-1', 'v_component_of_wind_300hPa', 'V300_GDS0_ISBL'), 'V400': ('999', 'V velocity at 400hPa', 'm s**-1', 'v_component_of_wind_400hPa', 'V400_GDS0_ISBL'), 'V500': ('999', 'V velocity at 500hPa', 'm s**-1', 'v_component_of_wind_500hPa', 'V500_GDS0_ISBL'), 'V700': ('999', 'V velocity at 700hPa', 'm s**-1', 'v_component_of_wind_700hPa', 'V700_GDS0_ISBL'), 'V850': ('999', 'V velocity at 850hPa', 'm s**-1', 'v_component_of_wind_850hPa', 'V850_GDS0_ISBL'), 'V925': ('999', 'V velocity at 925hPa', 'm s**-1', 'v_component_of_wind_925hPa', 'V925_GDS0_ISBL'), 'Q010': ('999', 'Specific humidity at 010hPa', 'kg kg**-1', 'specific_humidity_010hPa', 'Q010'), 'Q030': ('999', 'Specific humidity at 030hPa', 'kg kg**-1', 'specific_humidity_030hPa', 'Q030'), 'Q050': ('999', 'Specific humidity at 050hPa', 'kg kg**-1', 'specific_humidity_050hPa', 'Q050'), 'Q100': ('999', 'Specific humidity at 100hPa', 'kg kg**-1', 'specific_humidity_100hPa', 'Q100'), 'Q200': ('999', 'Specific humidity at 200hPa', 'kg kg**-1', 'specific_humidity_200hPa', 'Q200'), 'Q300': ('999', 'Specific humidity at 300hPa', 'kg kg**-1', 'specific_humidity_300hPa', 'Q300'), 'Q400': ('999', 'Specific humidity at 400hPa', 'kg kg**-1', 'specific_humidity_400hPa', 'Q400'), 'Q500': ('999', 'Specific humidity at 500hPa', 'kg kg**-1', 'specific_humidity_500hPa', 'Q500'), 'Q700': ('999', 'Specific humidity at 700hPa', 'kg kg**-1', 'specific_humidity_700hPa', 'Q700'), 'Q850': ('999', 'Specific humidity at 850hPa', 'kg kg**-1', 'specific_humidity_850hPa', 'Q850'), 'Q925': ('999', 'Specific humidity at 925hPa', 'kg kg**-1', 'specific_humidity_925hPa', 'Q925'), 'Z010': ('999', 'Geopotential height at 010hPa', 'm', 'geopotential_height_010hPa', 'Z010'), 'Z030': ('999', 'Geopotential height at 030hPa', 'm', 'geopotential_height_030hPa', 'Z030'), 'Z050': ('999', 'Geopotential height at 050hPa', 'm', 'geopotential_height_050hPa', 'Z050'), 'Z100': ('999', 'Geopotential height at 100hPa', 'm', 'geopotential_height_100hPa', 'Z100'), 'Z200': ('999', 'Geopotential height at 200hPa', 'm', 'geopotential_height_200hPa', 'Z200'), 'Z300': ('999', 'Geopotential height at 300hPa', 'm', 'geopotential_height_300hPa', 'Z300'), 'Z400': ('999', 'Geopotential height at 400hPa', 'm', 'geopotential_height_400hPa', 'Z400'), 'Z500': ('999', 'Geopotential height at 500hPa', 'm', 'geopotential_height_500hPa', 'Z500'), 'Z700': ('999', 'Geopotential height at 700hPa', 'm', 'geopotential_height_700hPa', 'Z700'), 'Z850': ('999', 'Geopotential height at 850hPa', 'm', 'geopotential_height_850hPa', 'Z850'), 'Z925': ('999', 'Geopotential height at 925hPa', 'm', 'geopotential_height_925hPa', 'Z925'), 'T010': ('999', 'Temperature at 010hPa', 'K', 'temperature_010hPa', 'T010'), 'T030': ('999', 'Temperature at 030hPa', 'K', 'temperature_030hPa', 'T030'), 'T050': ('999', 'Temperature at 050hPa', 'K', 'temperature_050hPa', 'T050'), 'T100': ('999', 'Temperature at 100hPa', 'K', 'temperature_100hPa', 'T100'), 'T200': ('999', 'Temperature at 200hPa', 'K', 'temperature_200hPa', 'T200'), 'T300': ('999', 'Temperature at 300hPa', 'K', 'temperature_300hPa', 'T300'), 'T400': ('999', 'Temperature at 400hPa', 'K', 'temperature_400hPa', 'T400'), 'T500': ('999', 'Temperature at 500hPa', 'K', 'temperature_500hPa', 'T500'), 'T700': ('999', 'Temperature at 700hPa', 'K', 'temperature_700hPa', 'T700'), 'T850': ('999', 'Temperature at 850hPa', 'K', 'temperature_850hPa', 'T850'), 'T925': ('999', 'Temperature at 925hPa', 'K', 'temperature_925hPa', 'T925'), 'QFLX': ('999', 'Surface water flux', 'kg m**-2 s**-1', 'surface_water_flux', 'QFLX'), 'FLUT': ('999', 'Upwelling longwave flux at top of model', 'W m**-2', 'upwelling_longwave_top', 'FLUT') } if VARIABLE_LIST == "all": VARIABLE_LIST = [x for x in list(PARAMETER_DICT.keys())] # ============================================================================ # Functions # ============================================================================ def naive_noleap_to_gregorian(time_points, start_date): """ Converts noleap dates to gregorian without adding leap day. Aligns the first date on the two calendars, and then converts the noleap dates to gregorian dates making the assumption that there are no leap days in the input dates. Parameters ---------- time_points : array_like(float) Numbers that correspond to 'days since' a certain date. dates : array_like(cftime._cftime.DatetimeNoLeap) Returns ------- new_time_points : array_like(float) Numbers that correspond to 'days since 1850-01-01 00:00:00' new_dates : array_like(cftime._cftime.DatetimeGregorian) """ start_point = time_points[0] start_date_gregorian = cftime.datetime(start_date.year, start_date.month, start_date.day, start_date.hour, calendar='gregorian') start_num = cftime.date2num(start_date_gregorian, units='days since 1850-01-01 00:00:00') new_time_points = [start_num + i for i in range(0, len(time_points))] new_dates = cftime.num2date(new_time_points, 'days since 1850-01-01 00:00:00', 'gregorian') return new_time_points, new_dates def change_calendar(time_variable): # Get the time variable units and calendar. unit_string_in = time_variable.units calendar_in = time_variable.calendar # Check calendar is a 365 day calendar as expected. if calendar_in not in ['365_day', 'noleap']: raise ValueError('The input calendar is not a 365_day calendar,' + \ 'which is expected for NorCPM output.') # Get the time points from the variable. time_points_in = time_variable[:] # Convert these into dates. dates_in = cftime.num2date(time_points_in, unit_string_in, calendar_in) start_date_in = dates_in[0] # Use a naive conversion which assumes the time points don't correspond to a leap day. # Note that for monthly files, the dates are always the first of the month, so never a leap day. time_points_out, dates_out = naive_noleap_to_gregorian( time_points_in, start_date_in) # The request is now that the monthly data is given by the date of the first day of the month. # In the raw model output, the timestamp for the monthly files is at the end of the month. # Therefore a shift of one month is introduced here. datetimes_out = [datetime.datetime(1850, 1, 1) + datetime.timedelta(int(x)) for x in time_points_out] shifted_datetimes_out = [x - relativedelta(months=1) for x in datetimes_out] time_points_out = [(x - datetime.datetime(1850, 1, 1)).days for x in shifted_datetimes_out] return time_points_out def create_dimensions(dataset, ntime, nmembers): dataset.createDimension('number', nmembers) dataset.createDimension('time', ntime) dataset.createDimension('latitude', 180) dataset.createDimension('longitude', 360) def create_generic_variables(dataset, forecast_hours, lats, lons, nmembers, compression): # Create a time variable. # --------------------------- dataset.createVariable( 'time', 'i4', ('time',), zlib=True, complevel=compression) dataset.variables['time'].units = 'hours since 1900-01-01 00:00:00.0' dataset.variables['time'].long_name = 'time' dataset.variables['time'].calendar = 'gregorian' dataset.variables['time'][:] = forecast_hours # Create the latitude variable. # ----------------------------- dataset.createVariable( 'latitude', 'f4', ('latitude',), zlib=True, complevel=compression) dataset.variables['latitude'].units = 'degrees_north' dataset.variables['latitude'].long_name = 'latitude' dataset.variables['latitude'][:] = lats # Create the longitude variable. # ----------------------------- dataset.createVariable( 'longitude', 'f4', ('longitude',), zlib=True, complevel=compression) dataset.variables['longitude'].units = 'degrees_east' dataset.variables['longitude'].long_name = 'longitude' dataset.variables['longitude'][:] = lons # Create the ensemble variable. # ----------------------------- dataset.createVariable( 'number', 'i4', ('number',), zlib=True, complevel=compression, fill_value=False) dataset.variables['number'].long_name = 'ensemble_member' dataset.variables['number'][:] = np.array(range(nmembers)) def create_field_variable(dataset, field, variable_name_out, long_name, parameter_number, units, initial_date, compression): dataset.createVariable( variable_name_out, 'f4', ('time', 'number', 'latitude', 'longitude',), zlib=True, fill_value=1.e+20, complevel=compression) dataset.variables[variable_name_out].units = units dataset.variables[variable_name_out].long_name = long_name dataset.variables[variable_name_out].initial_time = initial_date.strftime('%d/%m/%Y %H:%M') dataset.variables[variable_name_out][:] = field def regrid_field(field): # Input grid information. rlat_in = 180/95 rlon_in = 2.5 res_in = (rlon_in, rlat_in) lon_bounds_in = (0 - rlon_in/2, 357.5 + rlon_in/2) lat_bounds_in = (-90 - rlat_in/2 , 90 + rlat_in/2) # Output grid information. res_out = (1, -1) lon_bounds_out = (-180, 180) lat_bounds_out = (90, -90) method = 'conservative' periodic = False # Ensure array is masked, converting NaNs or infs to be masked as True. field = np.ma.masked_invalid(field, copy=False) # Set up input and output grids. grid_in = xe.util.grid_2d(lon_bounds_in[0],lon_bounds_in[1], res_in[0], lat_bounds_in[0],lat_bounds_in[1], res_in[1]) grid_out = xe.util.grid_2d(lon_bounds_out[0], lon_bounds_out[1], res_out[0], lat_bounds_out[0], lat_bounds_out[1], res_out[1]) regridder = xe.Regridder(grid_in, grid_out, method, periodic=periodic, reuse_weights=True, weights=GRID_FILE) # Set up a mask that is 0 for invalid and 1 for valid values. mask = np.where(field.mask, np.zeros(field.shape), np.ones(field.shape)) # Regrid the mask. mask = regridder(mask) # Set values that are more than 50% composed of invalid values to np.nan. mask = 1. / np.where(mask > 0.5, mask, np.nan) field_out = regridder(np.where(field.mask, 0. , field)) * mask lons = np.array(grid_out.lon) lats = np.array(grid_out.lat) # Get 1D versions of latitude and longitude points lons = lons[0, :] lats = lats[:, 0] return field_out, lats, lons def convert_metadata(directory): if directory[-1] != '/': base_directory = os.path.basename(directory) else: base_directory = directory.split('/')[-2] # Get the initial time when hindcast was started (from filename). # This is used for the filename. last_assimilation_date_string = base_directory.split('_')[2] last_assimilation_date = datetime.datetime.strptime( last_assimilation_date_string, '%Y%m%d') # Shift the last_assimilation_date by one. # Now it is the date of the first valid month, instead of last assimilation. filename_date = last_assimilation_date + relativedelta(months=1) filename_date_string = filename_date.strftime("%Y%m") # Output directory. output_dir = os.path.join(OUTPUT_DIR, 'monthly', filename_date_string) if not OVERWRITE_PREVIOUS: # Skip the directory if it exists. if os.path.exists(output_dir): print(f'Skipping directory {output_dir} because it already exists.') return # Create output directory. os.makedirs(output_dir, exist_ok=True) # Print some information. print(f'Converting {directory}') # Get a list of the monthly files for this hindcast date. monthly_file_list = glob.glob(os.path.join(directory, 'atm/hist/', SYSTEM + '_hindcast_*_mem01-60.cam2.h0.*.nc')) monthly_file_list.sort() # Initial date is given as the first of the month. initial_date = cftime.DatetimeGregorian(year=filename_date.year, month=filename_date.month, day=1) # For each variable in the user defined list. for variable_name_in in VARIABLE_LIST: # Print some information. print(f'Variable = {variable_name_in}') # Initialise lists so we can loop through monthly files to combine. total_variable_field_list = [] time_points = [] # For each files in this monthly directory. for i, ifile in enumerate(monthly_file_list): # We exclude the first (half) month for provision to CF project. if TRIM and (i == 0 or i > 6): pass else: # Load file. rootgrp_in = Dataset(ifile, 'r') # Get the time variable. time_variable = rootgrp_in.variables['time'] # Get the original field variable. data_variable = rootgrp_in.variables[variable_name_in] field = data_variable[:] # Change the calendar from 365_day to gregorian. time_points.append(change_calendar(time_variable)[0]) # Convert precipitation and snowfall variable units. # if variable_name_in in ['PRECT', 'PRECS']: # # Convert from m/s to m/day. # field = 60 * 60 * 24 * field # Regrid the field data. field, lats, lons = regrid_field(field) total_variable_field_list.append(field) # Close input file. rootgrp_in.close() # Combine these fields from different months. total_variable_field = np.concatenate(total_variable_field_list, axis=1) # Get the number of ensemble members. nmembers = total_variable_field.shape[0] # Get the number of time values.. ntime = total_variable_field.shape[1] # Make time the first dimension and number the second. total_variable_field = np.swapaxes(total_variable_field, 0, 1) # Get the part of the file name associated with the variable. variable_file_name = PARAMETER_DICT[variable_name_in][3] # Create output file. output_file = os.path.join(output_dir, f'{variable_file_name}_bccr_{SYSTEM_ID}_{filename_date.year}_{filename_date.month:02d}.nc') rootgrp_out = Dataset(output_file, 'w', format='NETCDF4') # Create standard dimensions. create_dimensions(dataset=rootgrp_out, ntime=ntime, nmembers=nmembers) forecast_datetimes = np.array([datetime.datetime(1850, 1, 1) + datetime.timedelta(days=int(x)) for x in time_points]) forecast_deltas = [x - datetime.datetime(1900, 1, 1) for x in forecast_datetimes] forecast_hours = [x.days * 24 for x in forecast_deltas] # Create standard variables. create_generic_variables(rootgrp_out, forecast_hours, lats, lons, nmembers, COMPRESSION) # Create field variable. create_field_variable( dataset=rootgrp_out, field=total_variable_field, variable_name_out=PARAMETER_DICT[variable_name_in][4], long_name=PARAMETER_DICT[variable_name_in][1], parameter_number=PARAMETER_DICT[variable_name_in][0], units=PARAMETER_DICT[variable_name_in][2], initial_date=initial_date, compression=COMPRESSION) # Close output file. rootgrp_out.close() print(f'Completed directory: {directory}') # ============================================================================ # Main # ============================================================================ if __name__ == '__main__': start_time = time.time() # Apply conversion to multiple directories. # ----------------------------------------- # Parallel. if NCPU > 1: directory_list = glob.glob(INGLOB) directory_list.sort() with Pool(NCPU) as p: p.map(convert_metadata, directory_list) # Serial. else: file_list = glob.glob(INGLOB) for file in file_list: convert_metadata(file) print("--- %s seconds ---" % (time.time() - start_time))