""" Description ----------- Script to process metadata for provision to the Climate Futures project. This processes the six-hourly output from NorCPM1 forecasts and hindcasts. Author ------ Tarkan A Bilge. """ # ============================================================================ # Imports # ============================================================================ import os import calendar import glob from netCDF4 import Dataset import numpy as np import xesmf as xe import cftime import datetime from dateutil.relativedelta import relativedelta import dask.array as da import dask.distributed from functools import partial from multiprocessing import Pool import time # ============================================================================ # Input # ============================================================================ NCPU = 4 # Set to 1 for forecast. Can be increased for hindcasts (multiple input files), try 2 or 3. COMPRESSION = 4 # Compression level of output files. TRIM = True # Trim off first half month, and leave next 125 days (True for CF). VARIABLE_LIST = ['TREFHT', 'PSL', 'UAS', 'VAS'] GRID_FILE = '/projects/NS9039K/projects/climate_futures/seasonal_forecast/misc/conservative_96x144_180x360.nc' OVERWRITE_PREVIOUS = True # False if you are generating hindcasts and it crashes mid-way. # Input files. # INGLOB = '/projects/NS9039K/shared/ClimateFutures/new/norcpm-cf-system1_hindcast_20221115_mem01-60/' # For reading in forecast. INGLOB = '/projects/NS9873K/norcpm-cf/raw/norcpm-cf-system1/norcpm-cf-system1_hindcast/*0215*/*' # Example for reading in hindcast set. # Output files. OUTPUT_DIR = '/projects/NS9039K/projects/climate_futures/seasonal_forecast/processed/' # Parameter metadata in the form: # {input_name : (parameter_number, long_name, units, variable_file_name, # output_name} PARAMETER_DICT = {'TREFHT' : ('167', '2 metre temperature', 'K', '2m_temperature', 't2m'), 'PSL' : ('151', 'msl', 'Pa', 'mean_sea_level_pressure', 'MSL_GDS0_SFC'), 'U850': ('131', 'U velocity at 850hPa', 'm s**-1', 'u_component_of_wind_850hPa', 'U850_GDS0_ISBL'), 'V850': ('132', 'V velocity at 850hPa', 'm s**-1', 'v_component_of_wind_850hPa', 'V850_GDS0_ISBL'), 'PRECS' : ('128', 'Mean total snowfall rate', 'm of water equivalent s**-1', 'snowfall', 'SF_GDS0_SFC'), 'U10' : ('207', '10 metre wind speed', 'm s**-1', '10m_wind_speed', 'usi'), 'UAS' : ('165', '10 metre U wind component', 'm s**-1', '10m_u_component_of_wind', 'u10'), 'VAS' : ('166', '10 metre V wind component', 'm s**-1', '10m_v_component_of_wind', 'v10'), 'PRECT' : ('228', 'Mean total precipitation rate', 'm s**-1', 'tprate', 'TP_GDS0_SFC'), 'SST' : ('34', 'Sea surface temperature', 'K', 'sea_surface_temperature', 'SSTK_GDS0_SFC')} # ============================================================================ # Functions # ============================================================================ def naive_noleap_to_gregorian(time_points, start_date): """ Converts noleap dates to gregorian without adding leap day. Aligns the first date on the two calendars, and then converts the noleap dates to gregorian dates making the assumption that there are no leap days in the input dates. Parameters ---------- time_points : array_like(float) Numbers that correspond to 'days since' a certain date. dates : array_like(cftime._cftime.DatetimeNoLeap) Returns ------- new_time_points : array_like(float) Numbers that correspond to 'days since 1850-01-01 00:00:00' new_dates : array_like(cftime._cftime.DatetimeGregorian) """ start_point = time_points[0] start_date_gregorian = cftime.datetime(start_date.year, start_date.month, start_date.day, start_date.hour, calendar='gregorian') start_num = cftime.date2num(start_date_gregorian, units='days since 1850-01-01 00:00:00') new_time_points = [start_num + ((i-1) * 0.25) for i in range(0, len(time_points))] new_dates = cftime.num2date(new_time_points, 'days since 1850-01-01 00:00:00', 'gregorian') return new_time_points, new_dates def leap_day(dates): ''' Finds indices of leap days Parameters ---------- dates : list of datetime.datetime A list of dates in the input file. Returns ------- list of dates or None. ''' # Get the day of year for all dates. day_of_year_list = [_.dayofyr for _ in dates] # Check if there is a February 28th in the dates. if 59 in day_of_year_list: if day_of_year_list.count(59) > 4: raise NotImplementedError('More than one leap day in file.') feb28_index = day_of_year_list.index(59) feb28_indices = [feb28_index + i for i in range(0, 4)] feb28_dates = [dates[i] for i in feb28_indices] if calendar.isleap(feb28_dates[0].year): return feb28_indices else: return None else: return None def change_calendar(time_variable, data_variable): # Get the time variable units and calendar. unit_string_in = time_variable.units calendar_in = time_variable.calendar # Check calendar is a 365 day calendar as expected. if calendar_in not in ['365_day', 'noleap']: raise ValueError('The input calendar is not a 365_day calendar,' + \ 'which is expected for NorCPM output.') # Get the time points from the variable. time_points_in = time_variable[:] # Get the field values from the variable. field = np.ma.masked_array(data_variable[:], copy=False) # Convert these into dates. dates_in = cftime.num2date(time_points_in, unit_string_in, calendar_in) start_date_in = dates_in[0] # Use a naive conversion which assumes no leap day (not needed for monthly). time_points_out, dates_out = naive_noleap_to_gregorian( time_points_in, start_date_in) # Check if there is a leap day in the input dates. Find the indices of # the dates which are on the leap day (should be four because six-hourly.) leap_day_indices = leap_day(dates_out) # If there are no leap days, do nothing to the field. if not leap_day_indices: pass # Otherwise make leap day adjustments. else: # Adjust time variable: # -------------------- # Append point to the time_points_out. time_points_out.extend([time_points_out[-1] + i/4.0 for i in range(1,5)]) # And to the dates. dates_out = np.append(dates_out, np.array([dates_out[-1] + \ datetime.timedelta(hours=i*6) for i in range(1, 5)])) # Adjust the field variable: # -------------------------- # Get axis of time. for i, index_val in enumerate(leap_day_indices): time_axis = data_variable.dimensions.index('time') empty_list = [0]*len(data_variable.dimensions) empty_list[time_axis] = 1 feb_index_tuple = tuple( [slice(None) if x == 0 else index_val for x in empty_list]) # Get the fields for February 28th and March 1st. feb28_field = field[feb_index_tuple] mar_index_tuple = tuple( [slice(None) if x == 0 else (index_val + i + 4) for x in empty_list]) mar01_field = field[mar_index_tuple] # Calculate interpolated field. feb29_field = 0.5 * (feb28_field + mar01_field) # Insert the field. field = np.insert(field, index_val + 4, feb29_field, time_axis) return time_points_out, field def create_dimensions(dataset, ntime): dataset.createDimension('time', ntime) dataset.createDimension('latitude', 180) dataset.createDimension('longitude', 360) def create_generic_variables(dataset, forecast_hours, lats, lons, e, compression): # Create a forecast_time1 variable. # --------------------------- dataset.createVariable( 'time', 'i4', ('time',), zlib=True, complevel=compression) dataset.variables['time'].units = 'hours since 1900-01-01 00:00:00.0' dataset.variables['time'].long_name = 'time' dataset.variables['time'].calendar = 'gregorian' dataset.variables['time'][:] = forecast_hours # Create the latitude variable. # ----------------------------- dataset.createVariable( 'latitude', 'f4', ('latitude',), zlib=True, complevel=compression) dataset.variables['latitude'].units = 'degrees_north' dataset.variables['latitude'].long_name = 'latitude' dataset.variables['latitude'][:] = lats # Create the longitude variable. # ----------------------------- dataset.createVariable( 'longitude', 'f4', ('longitude',), zlib=True, complevel=compression) dataset.variables['longitude'].units = 'degrees_east' dataset.variables['longitude'].long_name = 'longitude' dataset.variables['longitude'][:] = lons def create_ensemble_variables(dataset, nmembers, compression): # Create the ensemble variable. # ----------------------------- dataset.createVariable( 'number', 'i4', ('number',), zlib=True, complevel=compression, fill_value=False) dataset.variables['number'].long_name = 'ensemble_member' dataset.variables['number'][:] = np.array(range(0, nmembers)) def create_field_variable(dataset, field, variable_name_out, long_name, parameter_number, units, initial_date, compression): dataset.createVariable( variable_name_out, 'f4', ('time', 'latitude', 'longitude',), zlib=True, fill_value=1.e+20, complevel=compression) dataset.variables[variable_name_out].units = units dataset.variables[variable_name_out].long_name = long_name dataset.variables[variable_name_out].initial_time = initial_date.strftime('%d/%m/%Y %H:%M') dataset.variables[variable_name_out][:] = field def regrid_field(field): # Input grid information. rlat_in = 180/95 rlon_in = 2.5 res_in = (rlon_in, rlat_in) lon_bounds_in = (0 - rlon_in/2, 357.5 + rlon_in/2) lat_bounds_in = (-90 - rlat_in/2 , 90 + rlat_in/2) # Output grid information. res_out = (1, -1) lon_bounds_out = (-180, 180) lat_bounds_out = (90, -90) method = 'conservative' periodic = False # Ensure array is masked, converting NaNs or infs to be masked as True. field = np.ma.masked_invalid(field, copy=False) # Get the length of the ensemble and time coordinates. Nt = field.shape[1] # Set up input and output grids. grid_in = xe.util.grid_2d(lon_bounds_in[0],lon_bounds_in[1], res_in[0], lat_bounds_in[0],lat_bounds_in[1], res_in[1]) grid_out = xe.util.grid_2d(lon_bounds_out[0], lon_bounds_out[1], res_out[0], lat_bounds_out[0], lat_bounds_out[1], res_out[1]) regridder = xe.Regridder(grid_in, grid_out, method, periodic=periodic, reuse_weights=True, weights=GRID_FILE) mask = np.where(field.mask, np.zeros(field.shape), np.ones(field.shape)) # Set up a mask that is 0 for invalid and 1 for valid values. mask = np.where(field.mask, np.zeros(field.shape), np.ones(field.shape)) # Regrid the mask. mask = regridder(mask) # Set values that are more than 50% composed of invalid values to np.nan. mask = 1. / np.where(mask > 0.5, mask, np.nan) # Regridding happens here. field_out = regridder(np.where(field.mask, 0. , field)) * mask lons = np.array(grid_out.lon) lats = np.array(grid_out.lat) # Get 1D versions of latitude and longitude points lons = lons[0, :] lats = lats[:, 0] return field_out, lats, lons def convert_metadata(directory): if directory[-1] != '/': base_directory = os.path.basename(directory) else: base_directory = directory.split('/')[-2] print(f"Processing {directory}...") # Get the initial time when hindcast was started (from filename). # This is used for the filename. last_assimilation_date_string = base_directory.split('_')[2] last_assimilation_date = datetime.datetime.strptime( last_assimilation_date_string, '%Y%m%d') # Get a list of the six-hourly files for this hindcast date (should be 1). six_hourly_file_list = glob.glob(os.path.join(directory, 'atm/hist/', 'norcpm-cf-system1_hindcast_*_mem01-60.cam2.h2.*.nc')) # Check that there is only 1 six-hourly file. if len(six_hourly_file_list) > 1: raise ValueError( 'There is more than one six-hourly file in this directory.') # Shift the last_assimilation_date by one. # Now it is the date of the first valid month, instead of last assimilation. filename_date = last_assimilation_date + relativedelta(months=1) filename_date_string = filename_date.strftime("%Y%m") # Output directory. output_dir = os.path.join(OUTPUT_DIR, 'six-hourly', filename_date_string) if not OVERWRITE_PREVIOUS: if os.path.exists(output_dir): print(f'Skipping directory {output_dir} because it already exists.') return # Create output directory. os.makedirs(output_dir, exist_ok=True) # Print some information for the log file. print(f'Converting directory: {directory}') # Get the initial time when hindcast was started (from filename). initial_date = last_assimilation_date # For each variable in the user defined list. for variable_name_in in VARIABLE_LIST: # Print some information for the log file. print(f'Variable = {variable_name_in}') # Get filename. file = six_hourly_file_list[0] # Load file. rootgrp_in = Dataset(file, 'r') # Get the time variable. time_variable = rootgrp_in.variables['time'] # Get the original field variable. data_variable = rootgrp_in.variables[variable_name_in] # Change the calendar from 365_day to gregorian. time_points, field = change_calendar(time_variable, data_variable) # Convert precipitation and snowfall variable units. if variable_name_in in ['PRECT', 'PRECS']: # Convert from mm/s to mm/day. field = 60 * 60 * 24 * field # Accumulate values field = np.cumsum(field, axis=1) # Get the part of the file name associated with the variable. variable_file_name = PARAMETER_DICT[variable_name_in][3] if TRIM: # Find the start day of the next month. initial_date = cftime.DatetimeGregorian( initial_date.year + int(initial_date.month / 12), ((initial_date.month % 12) + 1), 1) initial_day = (initial_date - datetime.datetime(1850, 1, 1)).days end_day = initial_day + 125 time_points = np.array(time_points) trim_mask = np.logical_and(time_points >= initial_day, time_points < end_day) # Trim time. time_points = time_points[trim_mask] # Convert time points into days since 1900-01-01 00000 forecast_hours = [int((((datetime.datetime(1850, 1, 1) + datetime.timedelta(hours=(x * 24))) - datetime.datetime(1900, 1, 1)).total_seconds()) / 3600) for x in time_points] nmembers = field.shape[0] for e in range(0, nmembers): # Regrid the field data. ifield, lats, lons = regrid_field(field[e, :, :, :]) # Trim the data: if TRIM: ifield = ifield[trim_mask, :, :] # Get the number of time values.. ntime = ifield.shape[0] # Create output file. ioutput_file = os.path.join(output_dir, f'efile_{variable_file_name}_bccr_1_{initial_date.year:04d}_{initial_date.month:02d}_{e:02d}.nc4') # Name of the output file. rootgrp_out = Dataset(ioutput_file, 'w', format='NETCDF4') # Create standard dimensions. create_dimensions(dataset=rootgrp_out, ntime=ntime) # Create standard variables. create_generic_variables(rootgrp_out, forecast_hours, lats, lons, e, COMPRESSION) # Create field variable. create_field_variable( dataset=rootgrp_out, field=ifield, variable_name_out=PARAMETER_DICT[variable_name_in][4], long_name=PARAMETER_DICT[variable_name_in][1], parameter_number=PARAMETER_DICT[variable_name_in][0], units=PARAMETER_DICT[variable_name_in][2], initial_date=initial_date, compression=COMPRESSION) # Close ensemble member output file. rootgrp_out.close() # Combine the ensemble member output files. output_file = os.path.join(output_dir, f'{variable_file_name}_bccr_1_{initial_date.year:04d}_{initial_date.month:02d}_01.nc') input_files = os.path.join(output_dir, f'efile_{variable_file_name}_bccr_1_{initial_date.year:04d}_{initial_date.month:02d}_*.nc4') os.system(f"ncecat {input_files} {output_file}") # Remove the individual ensemble member output files. os.system(f"rm {input_files}") os.system(f"ncrename -d record,number {output_file}") # Open the final output file. final_rootgrp_out = Dataset(output_file, 'r+', format='NETCDF4') # Add the ensemble variable. create_ensemble_variables(final_rootgrp_out, nmembers, COMPRESSION) # Close the final output file. rootgrp_out.close() # Close input file. rootgrp_in.close() # ============================================================================ # Main # ============================================================================ if __name__ == '__main__': start_time = time.time() # Apply conversion to a list of directories. # ------------------------------------------ if NCPU > 1: # Parallel. (Parallelised over input files, so over hindcast start date). directory_list = glob.glob(INGLOB) with Pool(NCPU) as p: p.map(convert_metadata, directory_list) else: # Serial. directory_list = glob.glob(INGLOB) for directory in directory_list: convert_metadata(directory) print("--- %s seconds ---" % (time.time() - start_time))