Hello everyone,
I would like to share with you my recent experience with gridding data and how using multiprocessing on a High-Performance Computing (HPC) cluster has dramatically reduced my processing time. In fact, I was able to grid 54 files of NEXRAD data in less than 10 minutes.
To achieve this, I used the multiprocessing library in Python, which allows multiple processes to run simultaneously, taking advantage of the cluster’s computing power. I want to emphasize that I did not try this approach on my local machine, and it is essential to have access to a powerful computing resource to benefit from multiprocessing.
import warnings
warnings.filterwarnings("ignore")
import pyart
import numpy as np
from matplotlib import pyplot as plt
import glob, os
from datetime import datetime
import time
import sys
import multiprocessing as mp
# Define a function that takes a file name and writes the GRID file
def process_file(nexrad_dir, rfile):
print(f'Reading file: {rfile.split("/")[-1]}')
radar = pyart.io.read(rfile)
radar = filter_data(radar, "DBZ", "VEL")
max_rng = 360.*1e3 # np.ceil(radar.range['data'].max())
grid = pyart.map.grid_from_radars(radar,(61,721,721),
((0.,15000.),(-max_rng, max_rng),(-max_rng, max_rng)),
weighting_function='Barnes2',
fields=['REF', 'VEL', 'ZDR', 'RHO']
)
print(f'Deleting: {rfile.split("/")[-1]}')
del radar
# Extract the NEXRAD file number from the file name
file_number = rfile.split('NEXRAD')[-1]
# Construct the full path to the GRID file using os.path.join()
grid_file_path = os.path.join(nexrad_dir, f'GRID{file_number}')
print(f'Saving Grid: {rfile.split("/")[-1]}')
# Write the GRID file using pyart.io.write_grid()
pyart.io.write_grid(filename=grid_file_path, grid=grid)
def process_file_wrapper(args):
return process_file(*args)
nexrad_dir = "/xxxxxxx/xxxxx/xxxxx/xxxx/obsdata/2022/NEXRAD/"
data_files = get_nexrad_data_files("20220330220000", "20220331040000")
# Create a pool of worker processes with 100 CPUs
pool = mp.Pool(processes=100)
# Map the data files to the process_file function to process them in parallel
results = pool.map(process_file_wrapper, [(nexrad_dir, f) for f in data_files])
# Close the pool of worker processes
pool.close()
pool.join()