import gzip
import io
import json
import os
import shutil
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Tuple, Union
import pandas as pd
import requests
import xarray as xr
from .._base_downloaders import BaseDownloader
[docs]
class NOAADownloader(BaseDownloader):
"""
This is the main class to download and read data from NOAA.
Attributes
----------
config : dict
The configuration for NOAA data sources loaded from JSON file.
base_path_to_download : Path
Base path where the data is stored.
debug : bool
Whether to run in debug mode.
Examples
--------
.. jupyter-execute::
from bluemath_tk.downloaders.noaa.noaa_downloader import NOAADownloader
noaa_downloader = NOAADownloader(
base_path_to_download="/path/to/NOAA/", # Will be created if not available
debug=True,
check=True,
)
# Download buoy bulk parameters and load DataFrame
result = noaa_downloader.download_data(
data_type="bulk_parameters",
buoy_id="41001",
years=[2020, 2021, 2022],
load_df=True
)
print(result)
"""
config = json.load(
open(os.path.join(os.path.dirname(__file__), "NOAA_config.json"))
)
def __init__(
self,
base_path_to_download: str,
debug: bool = True,
check: bool = False,
) -> None:
"""
Initialize the NOAA downloader.
Parameters
----------
base_path_to_download : str
The base path to download the data to.
debug : bool, optional
Whether to run in debug mode. Default is True.
check : bool, optional
Whether to just check the data. Default is False.
"""
super().__init__(
base_path_to_download=base_path_to_download, debug=debug, check=check
)
self.set_logger_name("NOAADownloader", level="DEBUG" if debug else "INFO")
if not self.check:
self.logger.info("---- DOWNLOADING NOAA DATA ----")
else:
self.logger.info("---- CHECKING NOAA DATA ----")
@property
def datasets(self) -> dict:
return self.config["datasets"]
@property
def data_types(self) -> dict:
return self.config["data_types"]
[docs]
def list_data_types(self) -> List[str]:
"""
Lists the available data types.
Returns
-------
List[str]
The list of available data types.
"""
return list(self.data_types.keys())
[docs]
def list_datasets(self) -> List[str]:
"""
Lists the available datasets.
Returns
-------
List[str]
The list of available datasets.
"""
return list(self.datasets.keys())
[docs]
def show_markdown_table(self) -> None:
"""
Create a Markdown table from the configuration dictionary and print it.
"""
# Define the table headers
headers = ["name", "long_name", "description", "dataset"]
header_line = "| " + " | ".join(headers) + " |"
separator_line = (
"| " + " | ".join(["-" * len(header) for header in headers]) + " |"
)
# Initialize the table with headers
table_lines = [header_line, separator_line]
# Add rows for each data type
for data_type_name, data_type_info in self.data_types.items():
name = data_type_info.get("name", "")
long_name = data_type_info.get("long_name", "")
description = data_type_info.get("description", "")
dataset = data_type_info.get("dataset", "")
row = f"| {name} | {long_name} | {description} | {dataset} |"
table_lines.append(row)
# Print the table
print("\n".join(table_lines))
[docs]
def download_data(
self, data_type: str, load_df: bool = False, **kwargs
) -> Union[pd.DataFrame, xr.Dataset, str]:
"""
Downloads the data for the specified data type.
Parameters
----------
data_type : str
The data type to download.
- 'bulk_parameters'
- 'wave_spectra'
- 'directional_spectra'
- 'wind_forecast'
load_df : bool, optional
Whether to load and return the DataFrame after downloading.
Default is False.
If True and multiple years are specified, all years will be combined
into a single DataFrame.
**kwargs
Additional keyword arguments specific to each data type.
Returns
-------
Union[pd.DataFrame, xr.Dataset, str]
Downloaded data or status message.
Raises
------
ValueError
If the data type is not supported.
"""
if data_type not in self.data_types:
raise ValueError(
f"Data type {data_type} not supported. Available types: {self.list_data_types()}"
)
data_type_config = self.data_types[data_type]
dataset_config = self.datasets[data_type_config["dataset"]]
result = None
if data_type == "bulk_parameters":
result = self._download_bulk_parameters(
data_type_config, dataset_config, **kwargs
)
if load_df:
buoy_id = kwargs.get("buoy_id")
years = kwargs.get("years", [])
if years:
result = self.read_bulk_parameters(buoy_id, years)
elif data_type == "wave_spectra":
result = self._download_wave_spectra(
data_type_config, dataset_config, **kwargs
)
if load_df:
buoy_id = kwargs.get("buoy_id")
years = kwargs.get("years", [])
if years:
result = self.read_wave_spectra(buoy_id, years)
elif data_type == "directional_spectra":
result = self._download_directional_spectra(
data_type_config, dataset_config, **kwargs
)
if load_df:
buoy_id = kwargs.get("buoy_id")
years = kwargs.get("years", [])
if years:
result = self.read_directional_spectra(buoy_id, years)
elif data_type == "wind_forecast":
result = self._download_wind_forecast(
data_type_config, dataset_config, **kwargs
)
else:
raise ValueError(f"Download for data type {data_type} not implemented")
return result
def _download_bulk_parameters(
self,
data_type_config: dict,
dataset_config: dict,
buoy_id: str,
years: List[int],
**kwargs,
) -> pd.DataFrame:
"""
Download bulk parameters for a specific buoy and years.
Parameters
----------
data_type_config : dict
The configuration for the data type.
dataset_config : dict
The configuration for the dataset.
buoy_id : str
The buoy ID.
years : List[int]
The years to download data for.
Returns
-------
pd.DataFrame
The downloaded data.
"""
self.logger.info(
f"Downloading bulk parameters for buoy {buoy_id}, years {years}"
)
all_data = []
base_url = dataset_config["base_url"]
for year in years:
# Try main URL first, then fallbacks
urls = [
f"{base_url}/{data_type_config['url_pattern'].format(buoy_id=buoy_id, year=year)}"
]
for fallback in data_type_config.get("fallback_urls", []):
urls.append(f"{base_url}/{fallback.format(buoy_id=buoy_id, year=year)}")
df = self._download_single_year_bulk(
urls, data_type_config["columns"], year
)
if df is not None:
all_data.append(df)
self.logger.info(f"Buoy {buoy_id}: Data found for year {year}")
else:
self.logger.warning(
f"Buoy {buoy_id}: No data available for year {year}"
)
if all_data:
# Combine all years
combined_df = pd.concat(all_data, ignore_index=True)
combined_df = combined_df.sort_values(["YYYY", "MM", "DD", "hh"])
# Save to CSV if not in check mode
if not self.check:
buoy_dir = os.path.join(
self.base_path_to_download, "buoy_data", buoy_id
)
os.makedirs(buoy_dir, exist_ok=True)
output_file = os.path.join(
buoy_dir, f"buoy_{buoy_id}_bulk_parameters.csv"
)
combined_df.to_csv(output_file, index=False)
self.logger.info(f"Data saved to {output_file}")
return f"Data saved to {output_file}"
return combined_df
else:
self.logger.error(f"No data found for buoy {buoy_id}")
return None
def _download_single_year_bulk(
self, urls: List[str], columns: List[str], year: int
) -> Optional[pd.DataFrame]:
"""
Download and parse bulk parameters for a single year.
Parameters
----------
urls : List[str]
The URLs to download the data from.
columns : List[str]
The columns to read from the data.
year : int
The year to download data for.
Returns
-------
Optional[pd.DataFrame]
The downloaded data.
"""
for url in urls:
try:
response = requests.get(url)
if response.status_code == 200:
content = gzip.decompress(response.content).decode("utf-8")
# Skip the header rows and read the data
data = []
lines = content.split("\n")[2:] # Skip first two lines (headers)
# Check format by looking at the first data line
first_line = next(line for line in lines if line.strip())
cols = first_line.split()
# Determine format based on number of columns and year format
has_minutes = len(cols) == 18 # Post-2012 format has 18 columns
for line in lines:
if line.strip():
parts = line.split()
if parts:
# Convert 2-digit year to 4 digits if needed
if int(parts[0]) < 100:
parts[0] = str(int(parts[0]) + 1900)
# Add minutes column if it doesn't exist
if not has_minutes:
parts.insert(4, "00")
data.append(" ".join(parts))
# Read the modified data
df = pd.read_csv(
io.StringIO("\n".join(data)),
sep=r"\s+",
names=columns,
)
# Validate dates
valid_dates = (
(df["MM"] >= 1)
& (df["MM"] <= 12)
& (df["DD"] >= 1)
& (df["DD"] <= 31)
& (df["hh"] >= 0)
& (df["hh"] <= 23)
& (df["mm"] >= 0)
& (df["mm"] <= 59)
)
df = df[valid_dates].copy()
if len(df) > 0:
return df
except Exception as e:
self.logger.debug(f"Failed to download from {url}: {e}")
continue
return None
[docs]
def read_bulk_parameters(
self, buoy_id: str, years: Union[int, List[int]]
) -> Optional[pd.DataFrame]:
"""
Read bulk parameters for a specific buoy and year(s).
Parameters
----------
buoy_id : str
The buoy ID.
years : Union[int, List[int]]
The year(s) to read data for. Can be a single year or a list of years.
Returns
-------
Optional[pd.DataFrame]
DataFrame containing the bulk parameters, or None if data not found.
"""
if isinstance(years, int):
years = [years]
all_data = []
for year in years:
file_path = os.path.join(
self.base_path_to_download,
"buoy_data",
buoy_id,
f"buoy_{buoy_id}_bulk_parameters.csv",
)
try:
df = pd.read_csv(file_path)
df["datetime"] = pd.to_datetime(
df["YYYY"].astype(str)
+ "-"
+ df["MM"].astype(str).str.zfill(2)
+ "-"
+ df["DD"].astype(str).str.zfill(2)
+ " "
+ df["hh"].astype(str).str.zfill(2)
+ ":"
+ df["mm"].astype(str).str.zfill(2)
)
all_data.append(df)
except FileNotFoundError:
self.logger.error(
f"No bulk parameters file found for buoy {buoy_id} year {year}"
)
if all_data:
return pd.concat(all_data, ignore_index=True).sort_values("datetime")
return None
def _download_wave_spectra(
self,
data_type_config: dict,
dataset_config: dict,
buoy_id: str,
years: List[int],
**kwargs,
) -> str:
"""
Download wave spectra data for a specific buoy.
Parameters
----------
data_type_config : dict
The configuration for the data type.
dataset_config : dict
The configuration for the dataset.
buoy_id : str
The buoy ID.
years : List[int]
The years to download data for.
Returns
-------
str
The status message.
"""
self.logger.info(f"Downloading wave spectra for buoy {buoy_id}, years {years}")
base_url = dataset_config["base_url"]
buoy_dir = os.path.join(
self.base_path_to_download, "buoy_data", buoy_id, "wave_spectra"
)
if not self.check:
os.makedirs(buoy_dir, exist_ok=True)
downloaded_files = []
for year in years:
url = f"{base_url}/{data_type_config['url_pattern'].format(buoy_id=buoy_id, year=year)}"
try:
# Read the data
df = pd.read_csv(
url,
compression="gzip",
sep=r"\s+",
na_values=["MM", "99.00", "999.0"],
)
# Skip if empty or invalid data
if df.empty or len(df.columns) < 5:
self.logger.warning(f"No valid data for {buoy_id} - {year}")
continue
# Process datetime (simplified version)
if not self.check:
output_file = os.path.join(
buoy_dir, f"buoy_{buoy_id}_spectra_{year}.csv"
)
df.to_csv(output_file, index=False)
downloaded_files.append(output_file)
self.logger.info(f"Successfully saved data for {buoy_id} - {year}")
except Exception as e:
self.logger.warning(f"No data found for: {buoy_id} - {year}: {e}")
continue
return f"Downloaded {len(downloaded_files)} files for wave spectra"
[docs]
def read_wave_spectra(
self, buoy_id: str, years: Union[int, List[int]]
) -> Optional[pd.DataFrame]:
"""
Read wave spectra data for a specific buoy and year(s).
Parameters
----------
buoy_id : str
The buoy ID.
years : Union[int, List[int]]
The year(s) to read data for. Can be a single year or a list of years.
Returns
-------
Optional[pd.DataFrame]
DataFrame containing the wave spectra, or None if data not found
"""
if isinstance(years, int):
years = [years]
all_data = []
for year in years:
file_path = os.path.join(
self.base_path_to_download,
"buoy_data",
buoy_id,
"wave_spectra",
f"buoy_{buoy_id}_spectra_{year}.csv",
)
try:
df = pd.read_csv(file_path)
try:
df["date"] = pd.to_datetime(
df[["YYYY", "MM", "DD", "hh"]].rename(
columns={
"YYYY": "year",
"MM": "month",
"DD": "day",
"hh": "hour",
}
)
)
df.drop(columns=["YYYY", "MM", "DD", "hh"], inplace=True)
except Exception as _e:
df["date"] = pd.to_datetime(
df[["#YY", "MM", "DD", "hh", "mm"]].rename(
columns={
"#YY": "year",
"MM": "month",
"DD": "day",
"hh": "hour",
"mm": "minute",
}
)
)
df.drop(columns=["#YY", "MM", "DD", "hh", "mm"], inplace=True)
df.set_index("date", inplace=True)
all_data.append(df)
except FileNotFoundError:
self.logger.error(
f"No wave spectra file found for buoy {buoy_id} year {year}"
)
if all_data:
return pd.concat(all_data).sort_index()
return None
def _download_directional_spectra(
self,
data_type_config: dict,
dataset_config: dict,
buoy_id: str,
years: List[int],
**kwargs,
) -> str:
"""
Download directional wave spectra coefficients.
Parameters
----------
data_type_config : dict
The configuration for the data type.
dataset_config : dict
The configuration for the dataset.
buoy_id : str
The buoy ID.
years : List[int]
The years to download data for.
Returns
-------
str
The status message.
"""
self.logger.info(
f"Downloading directional spectra for buoy {buoy_id}, years {years}"
)
base_url = dataset_config["base_url"]
coefficients = data_type_config["coefficients"]
buoy_dir = os.path.join(
self.base_path_to_download, "buoy_data", buoy_id, "directional_spectra"
)
if not self.check:
os.makedirs(buoy_dir, exist_ok=True)
downloaded_files = []
for year in years:
for coef, info in coefficients.items():
filename = f"{buoy_id}{coef}{year}.txt.gz"
url = f"{base_url}/{info['url_pattern'].format(buoy_id=buoy_id, year=year)}"
if not self.check:
save_path = os.path.join(buoy_dir, filename)
try:
self.logger.debug(
f"Downloading {info['name']} data for {year}..."
)
response = requests.get(url, stream=True)
response.raise_for_status()
# Save the compressed file
with open(save_path, "wb") as f:
shutil.copyfileobj(response.raw, f)
downloaded_files.append(save_path)
self.logger.info(f"Successfully downloaded {filename}")
except requests.exceptions.RequestException as e:
self.logger.warning(f"Error downloading {filename}: {e}")
continue
return f"Downloaded {len(downloaded_files)} coefficient files"
[docs]
def read_directional_spectra(
self, buoy_id: str, years: Union[int, List[int]]
) -> Tuple[Optional[pd.DataFrame], ...]:
"""
Read directional spectra data for a specific buoy and year(s).
Parameters
----------
buoy_id : str
The buoy ID
years : Union[int, List[int]]
The year(s) to read data for. Can be a single year or a list of years.
Returns
-------
Tuple[Optional[pd.DataFrame], ...]
Tuple containing DataFrames for alpha1, alpha2, r1, r2, and c11,
or None for each if data not found
"""
if isinstance(years, int):
years = [years]
results = {
"alpha1": [],
"alpha2": [],
"r1": [],
"r2": [],
"c11": [],
}
for year in years:
dir_path = os.path.join(
self.base_path_to_download,
"buoy_data",
buoy_id,
"directional_spectra",
)
files = {
"alpha1": f"{buoy_id}d{year}.txt.gz",
"alpha2": f"{buoy_id}i{year}.txt.gz",
"r1": f"{buoy_id}j{year}.txt.gz",
"r2": f"{buoy_id}k{year}.txt.gz",
"c11": f"{buoy_id}w{year}.txt.gz",
}
for name, filename in files.items():
file_path = os.path.join(dir_path, filename)
try:
df = self._read_directional_file(file_path)
if df is not None:
results[name].append(df)
except FileNotFoundError:
self.logger.error(
f"No {name} file found for buoy {buoy_id} year {year}"
)
# Combine DataFrames for each coefficient if available
final_results = {}
for name, dfs in results.items():
if dfs:
final_results[name] = pd.concat(dfs).sort_index()
else:
final_results[name] = None
return (
final_results["alpha1"],
final_results["alpha2"],
final_results["r1"],
final_results["r2"],
final_results["c11"],
)
def _read_directional_file(self, file_path: Path) -> Optional[pd.DataFrame]:
"""
Read a directional spectra file and return DataFrame with datetime index.
Parameters
----------
file_path : Path
Path to the file to read
Returns
-------
Optional[pd.DataFrame]
DataFrame containing the directional spectra data, or None if data not found
"""
self.logger.debug(f"Reading file: {file_path}")
try:
with gzip.open(file_path, "rt") as f:
# Read header lines until we find the frequencies
header_lines = []
while True:
line = f.readline().strip()
if not line.startswith("#") or not line.startswith("YYYY"):
break
header_lines.append(line)
# Parse frequencies
header = " ".join(header_lines)
try:
freqs = [float(x) for x in header.split()[5:]]
self.logger.debug(f"Found {len(freqs)} frequencies")
except (ValueError, IndexError) as e:
self.logger.error(f"Error parsing frequencies: {e}")
return None
# Read data
data = []
dates = []
# Process the first line
parts = line.strip().split()
if len(parts) >= 5:
try:
year, month, day, hour, minute = map(int, parts[:5])
values = [float(x) for x in parts[5:]]
if len(values) == len(freqs):
dates.append(datetime(year, month, day, hour, minute))
data.append(values)
except (ValueError, IndexError) as e:
self.logger.error(f"Error parsing line: {e}")
# Read remaining lines
for line in f:
parts = line.strip().split()
if len(parts) >= 5:
try:
year, month, day, hour, minute = map(int, parts[:5])
values = [float(x) for x in parts[5:]]
if len(values) == len(freqs):
dates.append(datetime(year, month, day, hour, minute))
data.append(values)
except (ValueError, IndexError) as e:
self.logger.error(f"Error parsing line: {e}")
continue
if not data:
self.logger.warning("No valid data points found in file")
return None
df = pd.DataFrame(data, index=dates, columns=freqs)
self.logger.debug(f"Created DataFrame with shape: {df.shape}")
return df
except Exception as e:
self.logger.error(f"Error reading file {file_path}: {str(e)}")
return None
def _download_wind_forecast(
self,
data_type_config: dict,
dataset_config: dict,
date: str = None,
region: List[float] = None,
**kwargs,
) -> xr.Dataset:
"""
Download NOAA GFS wind forecast data.
Parameters
----------
data_type_config : dict
The configuration for the data type.
dataset_config : dict
The configuration for the dataset.
date : str, optional
The date to download data for.
Returns
-------
xr.Dataset
The downloaded data.
Notes
-----
- This will be DEPRECATED in the future.
"""
if date is None:
date = datetime.today().strftime("%Y%m%d")
self.logger.info(f"Downloading wind forecast for date {date}")
url_base = dataset_config["base_url"]
dbn = "gfs_0p25_1hr"
url = f"{url_base}/gfs{date}/{dbn}_00z"
# File path for local storage
forecast_dir = os.path.join(self.base_path_to_download, "wind_forecast")
if not self.check:
os.makedirs(forecast_dir, exist_ok=True)
file_path = os.path.join(
forecast_dir, f"{date}_{'_'.join(map(str, region))}.nc"
)
# Check if file exists
if os.path.isfile(file_path):
self.logger.info(
f"File already exists: {file_path}. Loading from local storage."
)
data = xr.open_dataset(file_path)
else:
if self.check:
self.logger.info(f"File would be downloaded to: {file_path}")
return None
self.logger.info(f"Downloading and cropping forecast data from: {url}")
# Crop dataset
data = xr.open_dataset(url)
# Select only wind data
variables = data_type_config["variables"]
data_select = data[variables]
self.logger.info(f"Storing local copy at: {file_path}")
data_select.to_netcdf(file_path)
data = data_select
# Create output dataset with renamed variables
output_vars = data_type_config["output_variables"]
wind_data_forecast = xr.Dataset(
{
output_vars["u10"]: (
("time", "lat", "lon"),
data[data_type_config["variables"][0]].values,
),
output_vars["v10"]: (
("time", "lat", "lon"),
data[data_type_config["variables"][1]].values,
),
},
coords={
"time": data.time.values,
"lat": data.lat.values,
"lon": data.lon.values,
},
)
wind_data_forecast["time"] = wind_data_forecast.time.dt.round("min")
return wind_data_forecast