import gzip
import io
import json
import os
import shutil
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Tuple, Union
import pandas as pd
import requests
import xarray as xr
from .._base_downloaders import BaseDownloader
from .._download_result import DownloadResult
[docs]
def read_bulk_parameters(
base_path: str, buoy_id: str, years: Union[int, List[int]]
) -> Optional[pd.DataFrame]:
"""
Read bulk parameters for a specific buoy and year(s).
Parameters
----------
base_path : str
Base path where the data is stored.
buoy_id : str
The buoy ID.
years : Union[int, List[int]]
The year(s) to read data for. Can be a single year or a list of years.
Returns
-------
Optional[pd.DataFrame]
DataFrame containing the bulk parameters, or None if data not found.
"""
if isinstance(years, int):
years = [years]
all_data = []
for year in years:
file_path = os.path.join(
base_path,
"NDBC",
"buoy_data",
buoy_id,
f"buoy_{buoy_id}_bulk_parameters.csv",
)
try:
df = pd.read_csv(file_path)
df["datetime"] = pd.to_datetime(
df["YYYY"].astype(str)
+ "-"
+ df["MM"].astype(str).str.zfill(2)
+ "-"
+ df["DD"].astype(str).str.zfill(2)
+ " "
+ df["hh"].astype(str).str.zfill(2)
+ ":"
+ df["mm"].astype(str).str.zfill(2)
)
all_data.append(df)
except FileNotFoundError:
print(f"No bulk parameters file found for buoy {buoy_id} year {year}")
if all_data:
return pd.concat(all_data, ignore_index=True).sort_values("datetime")
return None
[docs]
def read_wave_spectra(
base_path: str, buoy_id: str, years: Union[int, List[int]]
) -> Optional[pd.DataFrame]:
"""
Read wave spectra data for a specific buoy and year(s).
Parameters
----------
base_path : str
Base path where the data is stored.
buoy_id : str
The buoy ID.
years : Union[int, List[int]]
The year(s) to read data for. Can be a single year or a list of years.
Returns
-------
Optional[pd.DataFrame]
DataFrame containing the wave spectra, or None if data not found
"""
if isinstance(years, int):
years = [years]
all_data = []
for year in years:
file_path = os.path.join(
base_path,
"NDBC",
"buoy_data",
buoy_id,
"wave_spectra",
f"buoy_{buoy_id}_spectra_{year}.csv",
)
try:
df = pd.read_csv(file_path)
try:
df["date"] = pd.to_datetime(
df[["YYYY", "MM", "DD", "hh"]].rename(
columns={
"YYYY": "year",
"MM": "month",
"DD": "day",
"hh": "hour",
}
)
)
df.drop(columns=["YYYY", "MM", "DD", "hh"], inplace=True)
except Exception:
df["date"] = pd.to_datetime(
df[["#YY", "MM", "DD", "hh", "mm"]].rename(
columns={
"#YY": "year",
"MM": "month",
"DD": "day",
"hh": "hour",
"mm": "minute",
}
)
)
df.drop(columns=["#YY", "MM", "DD", "hh", "mm"], inplace=True)
df.set_index("date", inplace=True)
all_data.append(df)
except FileNotFoundError:
print(f"No wave spectra file found for buoy {buoy_id} year {year}")
if all_data:
return pd.concat(all_data).sort_index()
return None
def _read_directional_file(file_path: Path) -> Optional[pd.DataFrame]:
"""
Read a directional spectra file and return DataFrame with datetime index.
Parameters
----------
file_path : Path
Path to the file to read
Returns
-------
Optional[pd.DataFrame]
DataFrame containing the directional spectra data, or None if data not found
"""
print(f"Reading file: {file_path}")
try:
with gzip.open(file_path, "rt") as f:
# Read header lines until we find the frequencies
header_lines = []
while True:
line = f.readline().strip()
if not line.startswith("#") and not line.startswith("YYYY"):
break
header_lines.append(line)
# Parse frequencies
header = " ".join(header_lines)
try:
freqs = [float(x) for x in header.split()[5:]]
print(f"Found {len(freqs)} frequencies")
except (ValueError, IndexError) as e:
print(f"Error parsing frequencies: {e}")
return None
# Read data
data = []
dates = []
# Process the first line
parts = line.strip().split()
if len(parts) >= 5:
try:
year, month, day, hour, minute = map(int, parts[:5])
values = [float(x) for x in parts[5:]]
if len(values) == len(freqs):
dates.append(datetime(year, month, day, hour, minute))
data.append(values)
except (ValueError, IndexError) as e:
print(f"Error parsing line: {e}")
# Read remaining lines
for line in f:
parts = line.strip().split()
if len(parts) >= 5:
try:
year, month, day, hour, minute = map(int, parts[:5])
values = [float(x) for x in parts[5:]]
if len(values) == len(freqs):
dates.append(datetime(year, month, day, hour, minute))
data.append(values)
except (ValueError, IndexError) as e:
print(f"Error parsing line: {e}")
continue
if not data:
print("No valid data points found in file")
return None
df = pd.DataFrame(data, index=dates, columns=freqs)
print(f"Created DataFrame with shape: {df.shape}")
return df
except Exception as e:
print(f"Error reading file {file_path}: {str(e)}")
return None
[docs]
def read_directional_spectra(
base_path: str, buoy_id: str, years: Union[int, List[int]]
) -> Tuple[Optional[pd.DataFrame], ...]:
"""
Read directional spectra data for a specific buoy and year(s).
Parameters
----------
base_path : str
Base path where the data is stored.
buoy_id : str
The buoy ID
years : Union[int, List[int]]
The year(s) to read data for. Can be a single year or a list of years.
Returns
-------
Tuple[Optional[pd.DataFrame], ...]
Tuple containing DataFrames for alpha1, alpha2, r1, r2, and c11,
or None for each if data not found
"""
if isinstance(years, int):
years = [years]
results = {
"alpha1": [],
"alpha2": [],
"r1": [],
"r2": [],
"c11": [],
}
for year in years:
dir_path = os.path.join(
base_path,
"NDBC",
"buoy_data",
buoy_id,
"directional_spectra",
)
files = {
"alpha1": f"{buoy_id}d{year}.txt.gz",
"alpha2": f"{buoy_id}i{year}.txt.gz",
"r1": f"{buoy_id}j{year}.txt.gz",
"r2": f"{buoy_id}k{year}.txt.gz",
"c11": f"{buoy_id}w{year}.txt.gz",
}
for name, filename in files.items():
file_path = os.path.join(dir_path, filename)
try:
df = _read_directional_file(file_path)
if df is not None:
results[name].append(df)
except FileNotFoundError:
print(f"No {name} file found for buoy {buoy_id} year {year}")
# Combine DataFrames for each coefficient if available
final_results = {}
for name, dfs in results.items():
if dfs:
final_results[name] = pd.concat(dfs).sort_index()
else:
final_results[name] = None
return (
final_results["alpha1"],
final_results["alpha2"],
final_results["r1"],
final_results["r2"],
final_results["c11"],
)
[docs]
class NOAADownloader(BaseDownloader):
"""
This is the main class to download data from NOAA.
Examples
--------
>>> downloader = NOAADownloader(
... product="NDBC",
... base_path_to_download="./noaa_data",
... debug=True
... )
>>> result = downloader.download_data(
... data_type="bulk_parameters",
... buoy_id="41001",
... years=[2023],
... dry_run=False
... )
>>> print(result)
"""
products_configs = {
"NDBC": json.load(
open(os.path.join(os.path.dirname(__file__), "NDBC", "NDBC_config.json"))
)
}
def __init__(
self,
product: str,
base_path_to_download: str,
debug: bool = True,
) -> None:
"""
Initialize the NOAA downloader.
Parameters
----------
product : str
The product to download data from. Currently only NDBC is supported.
base_path_to_download : str
The base path to download the data to.
debug : bool, optional
Whether to run in debug mode. Default is True.
Raises
------
ValueError
If the product configuration is not found.
"""
super().__init__(
product=product, base_path_to_download=base_path_to_download, debug=debug
)
self._product_config = self.products_configs.get(product)
if self._product_config is None:
available_products = list(self.products_configs.keys())
raise ValueError(
f"Product '{product}' not found. Available: {available_products}"
)
self.set_logger_name(
f"NOAADownloader-{product}", level="DEBUG" if debug else "INFO"
)
self.logger.info(f"---- NOAA DOWNLOADER INITIALIZED ({product}) ----")
@property
def product_config(self) -> dict:
"""
Product configuration dictionary loaded from config file.
Returns
-------
dict
Product configuration dictionary.
"""
return self._product_config
@property
def data_types(self) -> dict:
"""
Data types configuration dictionary.
Returns
-------
dict
Dictionary of available data types and their configurations.
"""
return self.product_config["data_types"]
[docs]
def list_data_types(self) -> List[str]:
"""
List all available data types for the product.
Returns
-------
List[str]
List of available data type names.
"""
return list(self.data_types.keys())
def _check_file_exists(
self, file_path: str, result: DownloadResult, force: bool, dry_run: bool
) -> bool:
"""
Check if file exists and handle accordingly.
Parameters
----------
file_path : str
Path to the file to check.
result : DownloadResult
The download result to update.
force : bool
Whether to force re-download.
dry_run : bool
If True, only check files without downloading.
Returns
-------
bool
True if should skip download (file exists or dry_run mode), False otherwise.
"""
if not force and os.path.exists(file_path):
result.add_skipped(file_path, "File already exists")
return True
if dry_run:
result.add_skipped(file_path, "File does not exist (dry run)")
return True
return False
[docs]
def download_data(self, dry_run: bool = True, *args, **kwargs) -> DownloadResult:
"""
Download data for the product.
Routes to product-specific download methods based on the product type.
Parameters
----------
dry_run : bool, optional
If True, only check what would be downloaded without actually downloading.
Default is True.
*args
Arguments passed to product-specific download method.
**kwargs
Keyword arguments passed to product-specific download method.
Returns
-------
DownloadResult
Result with information about downloaded, skipped, and error files.
Raises
------
ValueError
If the product is not supported.
"""
if self.product == "NDBC":
return self.download_data_ndbc(dry_run=dry_run, *args, **kwargs)
else:
raise ValueError(f"Download for product {self.product} not supported")
[docs]
def download_data_ndbc(
self, data_type: str, dry_run: bool = True, **kwargs
) -> DownloadResult:
"""
Download data for the NDBC product.
Downloads NDBC buoy data or forecast data based on the specified data type.
Files are saved to: base_path_to_download/product/dataset/...
Parameters
----------
data_type : str
The data type to download. Available types:
- 'bulk_parameters': Standard meteorological data
- 'wave_spectra': Wave spectral density data
- 'directional_spectra': Directional wave spectra coefficients
- 'wind_forecast': GFS wind forecast data
dry_run : bool, optional
If True, only check what would be downloaded without actually downloading.
Default is True.
**kwargs
Additional keyword arguments specific to each data type:
- For bulk_parameters, wave_spectra, directional_spectra: buoy_id, years, force
- For wind_forecast: date, region, force
Returns
-------
DownloadResult
Result with information about downloaded, skipped, and error files.
Raises
------
ValueError
If the data type is not supported.
"""
if data_type not in self.data_types:
raise ValueError(
f"Data type {data_type} not supported. Available types: {self.list_data_types()}"
)
data_type_config = self.data_types[data_type]
dataset_config = self.product_config["datasets"][data_type_config["dataset"]]
if dry_run:
self.logger.info(f"DRY RUN: Checking files for {data_type}")
if data_type == "bulk_parameters":
result = self._download_bulk_parameters(
data_type_config, dataset_config, dry_run=dry_run, **kwargs
)
elif data_type == "wave_spectra":
result = self._download_wave_spectra(
data_type_config, dataset_config, dry_run=dry_run, **kwargs
)
elif data_type == "directional_spectra":
result = self._download_directional_spectra(
data_type_config, dataset_config, dry_run=dry_run, **kwargs
)
elif data_type == "wind_forecast":
result = self._download_wind_forecast(
data_type_config, dataset_config, dry_run=dry_run, **kwargs
)
else:
raise ValueError(f"Download for data type {data_type} not implemented")
return self.finalize_download_result(result)
def _download_bulk_parameters(
self,
data_type_config: dict,
dataset_config: dict,
buoy_id: str,
years: List[int],
force: bool = False,
dry_run: bool = False,
) -> DownloadResult:
"""
Download bulk parameters for a specific buoy and years.
Parameters
----------
data_type_config : dict
The configuration for the data type.
dataset_config : dict
The configuration for the dataset.
buoy_id : str
The buoy ID.
years : List[int]
The years to download data for.
force : bool, optional
Whether to force re-download even if file exists. Default is False.
dry_run : bool, optional
If True, only check what would be downloaded. Default is False.
Returns
-------
DownloadResult
Download result with information about downloaded, skipped, and error files.
"""
self.logger.info(
f"Downloading bulk parameters for buoy {buoy_id}, years {years}"
)
result = self.create_download_result()
base_url = dataset_config["base_url"]
dataset_name = data_type_config["dataset"]
try:
# Determine output file path: base_path/product/dataset/buoy_id/filename.csv
buoy_dir = os.path.join(
self.base_path_to_download, self.product, dataset_name, buoy_id
)
output_file = os.path.join(buoy_dir, f"buoy_{buoy_id}_bulk_parameters.csv")
# Check if file exists
if self._check_file_exists(output_file, result, force, dry_run):
return self.finalize_download_result(result)
# Prepare download tasks
download_tasks = []
for year in years:
urls = [
f"{base_url}/{data_type_config['url_pattern'].format(buoy_id=buoy_id, year=year)}"
]
for fallback in data_type_config.get("fallback_urls", []):
urls.append(
f"{base_url}/{fallback.format(buoy_id=buoy_id, year=year)}"
)
download_tasks.append(
{
"urls": urls,
"columns": data_type_config["columns"],
"year": year,
"buoy_id": buoy_id,
}
)
if dry_run:
# In dry run mode, just mark what would be downloaded
for task in download_tasks:
result.add_skipped(
output_file,
f"Would download year {task['year']} (dry run)",
)
return self.finalize_download_result(result)
# Execute downloads sequentially
all_data = []
for task in download_tasks:
try:
df = self._download_single_year_bulk(task["urls"], task["columns"])
if df is not None:
all_data.append(df)
self.logger.info(
f"Buoy {buoy_id}: Data found for year {task['year']}"
)
else:
self.logger.warning(
f"Buoy {buoy_id}: No data available for year {task['year']}"
)
result.add_error(
output_file,
Exception(f"No data available for year {task['year']}"),
)
except Exception as e:
self.logger.error(f"Error downloading year {task['year']}: {e}")
result.add_error(output_file, e)
if all_data:
# Combine all years
combined_df = pd.concat(all_data, ignore_index=True)
combined_df = combined_df.sort_values(["YYYY", "MM", "DD", "hh"])
# Save to CSV
os.makedirs(buoy_dir, exist_ok=True)
combined_df.to_csv(output_file, index=False)
self.logger.info(f"Data saved to {output_file}")
result.add_downloaded(output_file)
else:
self.logger.error(f"No data found for buoy {buoy_id}")
result.add_error(
output_file,
Exception(f"No data found for buoy {buoy_id}"),
)
except Exception as e:
result.add_error(output_file, e)
self.logger.error(f"Error processing data for buoy {buoy_id}: {e}")
return self.finalize_download_result(result)
def _download_single_year_bulk(
self,
urls: List[str],
columns: List[str],
) -> Optional[pd.DataFrame]:
"""
Download and parse bulk parameters for a single year.
Attempts to download from the primary URL, and if that fails, tries fallback URLs.
Handles different data formats (pre-2012 and post-2012) and validates dates.
Parameters
----------
urls : List[str]
List of URLs to try downloading from (primary URL first, then fallbacks).
columns : List[str]
List of column names for the DataFrame.
Returns
-------
Optional[pd.DataFrame]
DataFrame containing the downloaded and parsed data, or None if download fails.
"""
for url in urls:
try:
# Download the file
response = requests.get(url, timeout=30)
response.raise_for_status()
content = gzip.decompress(response.content).decode("utf-8")
# Skip the header rows and read the data
data = []
lines = content.split("\n")[2:] # Skip first two lines (headers)
# Check format by looking at the first data line
first_line = next(line for line in lines if line.strip())
cols = first_line.split()
# Determine format based on number of columns and year format
has_minutes = len(cols) == 18 # Post-2012 format has 18 columns
for line in lines:
if line.strip():
parts = line.split()
if parts:
# Convert 2-digit year to 4 digits if needed
if int(parts[0]) < 100:
parts[0] = str(int(parts[0]) + 1900)
# Add minutes column if it doesn't exist
if not has_minutes:
parts.insert(4, "00")
data.append(" ".join(parts))
# Read the modified data
df = pd.read_csv(
io.StringIO("\n".join(data)),
sep=r"\s+",
names=columns,
)
# Validate dates
valid_dates = (
(df["MM"] >= 1)
& (df["MM"] <= 12)
& (df["DD"] >= 1)
& (df["DD"] <= 31)
& (df["hh"] >= 0)
& (df["hh"] <= 23)
& (df["mm"] >= 0)
& (df["mm"] <= 59)
)
df = df[valid_dates].copy()
if len(df) > 0:
return df
except Exception as e:
self.logger.debug(f"Failed to download from {url}: {e}")
continue
return None
def _download_wave_spectra(
self,
data_type_config: dict,
dataset_config: dict,
buoy_id: str,
years: List[int],
force: bool = False,
dry_run: bool = False,
) -> DownloadResult:
"""
Download wave spectra data for a specific buoy.
Downloads wave spectral density data for each specified year. Files are saved to:
base_path_to_download/product/dataset/buoy_id/wave_spectra/buoy_{buoy_id}_spectra_{year}.csv
Parameters
----------
data_type_config : dict
Configuration for the data type.
dataset_config : dict
Configuration for the dataset.
buoy_id : str
The buoy ID.
years : List[int]
List of years to download data for.
force : bool, optional
Force re-download even if file exists. Default is False.
dry_run : bool, optional
If True, only check what would be downloaded. Default is False.
Returns
-------
DownloadResult
Result with information about downloaded, skipped, and error files.
"""
self.logger.info(f"Downloading wave spectra for buoy {buoy_id}, years {years}")
result = self.create_download_result()
base_url = dataset_config["base_url"]
dataset_name = data_type_config["dataset"]
buoy_dir = os.path.join(
self.base_path_to_download,
self.product,
dataset_name,
buoy_id,
"wave_spectra",
)
if not dry_run:
os.makedirs(buoy_dir, exist_ok=True)
for year in years:
url = f"{base_url}/{data_type_config['url_pattern'].format(buoy_id=buoy_id, year=year)}"
output_file = os.path.join(buoy_dir, f"buoy_{buoy_id}_spectra_{year}.csv")
# Check if file exists
if self._check_file_exists(output_file, result, force, dry_run):
continue
if dry_run:
result.add_skipped(output_file, f"Would download year {year} (dry run)")
continue
try:
# Download and read the data
response = requests.get(url, timeout=30)
response.raise_for_status()
# Read the data
df = pd.read_csv(
io.BytesIO(response.content),
compression="gzip",
sep=r"\s+",
na_values=["MM", "99.00", "999.0"],
)
# Skip if empty or invalid data
if df.empty or len(df.columns) < 5:
self.logger.warning(f"No valid data for {buoy_id} - {year}")
result.add_error(
output_file,
Exception(f"No valid data for {buoy_id} - {year}"),
context={"year": year},
)
continue
# Save the data
df.to_csv(output_file, index=False)
result.add_downloaded(output_file)
self.logger.info(f"Successfully saved data for {buoy_id} - {year}")
except Exception as e:
self.logger.warning(f"No data found for: {buoy_id} - {year}: {e}")
result.add_error(output_file, e, context={"year": year})
continue
return result
def _download_directional_spectra(
self,
data_type_config: dict,
dataset_config: dict,
buoy_id: str,
years: List[int],
force: bool = False,
dry_run: bool = False,
) -> DownloadResult:
"""
Download directional wave spectra coefficients.
Downloads Fourier coefficients (alpha1, alpha2, r1, r2, c11) for directional wave spectra.
Files are saved to:
base_path_to_download/product/dataset/buoy_id/directional_spectra/{buoy_id}{coef}{year}.txt.gz
Parameters
----------
data_type_config : dict
Configuration for the data type.
dataset_config : dict
Configuration for the dataset.
buoy_id : str
The buoy ID.
years : List[int]
List of years to download data for.
force : bool, optional
Force re-download even if file exists. Default is False.
dry_run : bool, optional
If True, only check what would be downloaded. Default is False.
Returns
-------
DownloadResult
Result with information about downloaded, skipped, and error files.
"""
self.logger.info(
f"Downloading directional spectra for buoy {buoy_id}, years {years}"
)
result = self.create_download_result()
base_url = dataset_config["base_url"]
coefficients = data_type_config["coefficients"]
dataset_name = data_type_config["dataset"]
buoy_dir = os.path.join(
self.base_path_to_download,
self.product,
dataset_name,
buoy_id,
"directional_spectra",
)
if not dry_run:
os.makedirs(buoy_dir, exist_ok=True)
for year in years:
for coef, info in coefficients.items():
filename = f"{buoy_id}{coef}{year}.txt.gz"
url = f"{base_url}/{info['url_pattern'].format(buoy_id=buoy_id, year=year)}"
save_path = os.path.join(buoy_dir, filename)
# Check if file exists
if self._check_file_exists(save_path, result, force, dry_run):
continue
if dry_run:
result.add_skipped(
save_path,
f"Would download {info['name']} for year {year} (dry run)",
)
continue
try:
self.logger.debug(f"Downloading {info['name']} data for {year}...")
# Download the file
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
# Save the compressed file
with open(save_path, "wb") as f:
shutil.copyfileobj(response.raw, f)
result.add_downloaded(save_path)
self.logger.info(f"Successfully downloaded {filename}")
except Exception as e:
self.logger.warning(f"Error downloading {filename}: {e}")
result.add_error(save_path, e)
continue
return self.finalize_download_result(result)
def _download_wind_forecast(
self,
data_type_config: dict,
dataset_config: dict,
date: str = None,
region: List[float] = None,
force: bool = False,
dry_run: bool = False,
) -> DownloadResult:
"""
Download NOAA GFS wind forecast data.
Downloads and crops GFS wind forecast data for a specific date and region.
Files are saved to:
base_path_to_download/product/dataset/{date}_{region}.nc
Parameters
----------
data_type_config : dict
Configuration for the data type.
dataset_config : dict
Configuration for the dataset.
date : str, optional
Date to download data for (format: "YYYYMMDD"). If None, uses today's date.
Default is None.
region : List[float], optional
Geographic region coordinates. Default is None.
force : bool, optional
Force re-download even if file exists. Default is False.
dry_run : bool, optional
If True, only check what would be downloaded. Default is False.
Returns
-------
DownloadResult
Result with information about downloaded, skipped, and error files.
Notes
-----
This method will be DEPRECATED in the future.
"""
if date is None:
date = datetime.today().strftime("%Y%m%d")
self.logger.info(f"Downloading wind forecast for date {date}")
result = self.create_download_result()
url_base = dataset_config["base_url"]
dataset_name = data_type_config["dataset"]
dbn = "gfs_0p25_1hr"
url = f"{url_base}/gfs{date}/{dbn}_00z"
# File path for local storage: base_path/product/dataset/filename.nc
forecast_dir = os.path.join(
self.base_path_to_download, self.product, dataset_name
)
if not dry_run:
os.makedirs(forecast_dir, exist_ok=True)
file_path = os.path.join(
forecast_dir, f"{date}_{'_'.join(map(str, region))}.nc"
)
# Check if file exists
if self._check_file_exists(file_path, result, force, dry_run):
return result
if dry_run:
result.add_skipped(
file_path, f"Would download wind forecast for {date} (dry run)"
)
return result
try:
self.logger.info(f"Downloading and cropping forecast data from: {url}")
# Open dataset from URL
data = xr.open_dataset(url)
# Select only wind data
variables = data_type_config["variables"]
data_select = data[variables]
self.logger.info(f"Storing local copy at: {file_path}")
data_select.to_netcdf(file_path)
result.add_downloaded(file_path)
except Exception as e:
self.logger.error(f"Error downloading wind forecast: {e}")
result.add_error(file_path, e)
return self.finalize_download_result(result)