import json
import os
from typing import Any, Dict, List, Optional
import cdsapi
from .._base_downloaders import BaseDownloader
from .._download_result import DownloadResult
[docs]
class CopernicusDownloader(BaseDownloader):
"""
Simple downloader for Copernicus Climate Data Store.
Examples
--------
>>> downloader = CopernicusDownloader(
... product="ERA5",
... base_path_to_download="./copernicus_data",
... token="your_token"
... )
>>> result = downloader.download_data(
... variables=["swh"],
... years=["2020"],
... months=["01"],
... force=False,
... dry_run=False
... )
"""
products_configs = {
"ERA5": json.load(
open(os.path.join(os.path.dirname(__file__), "ERA5", "ERA5_config.json"))
),
"CERRA": json.load(
open(os.path.join(os.path.dirname(__file__), "CERRA", "CERRA_config.json"))
),
}
def __init__(
self,
product: str,
base_path_to_download: str,
api_key: str,
debug: bool = True,
) -> None:
"""
Initialize the CopernicusDownloader.
Parameters
----------
product : str
The product to download data from (e.g., "ERA5", "CERRA").
base_path_to_download : str
Base path where downloaded files will be stored.
api_key : str
Copernicus CDS API key.
debug : bool, optional
If True, sets logger to DEBUG level. Default is True.
Raises
------
ValueError
If the product configuration is not found or server URL is not specified.
"""
super().__init__(
product=product, base_path_to_download=base_path_to_download, debug=debug
)
self._product_config = self.products_configs.get(product)
if self._product_config is None:
raise ValueError(
f"Product '{product}' not found. Available: {list(self.products_configs.keys())}"
)
self.set_logger_name(
f"CopernicusDownloader-{product}", level="DEBUG" if debug else "INFO"
)
# Initialize CDS client
server_url = self._product_config.get("url")
if server_url is None:
raise ValueError("Server URL not found in product configuration")
self._client = cdsapi.Client(url=server_url, key=api_key, debug=self.debug)
self.logger.info(f"---- COPERNICUS DOWNLOADER INITIALIZED ({product}) ----")
@property
def product_config(self) -> dict:
"""
Product configuration dictionary loaded from config file.
Returns
-------
dict
Product configuration dictionary.
"""
return self._product_config
@property
def client(self) -> cdsapi.Client:
"""
CDS API client (initialized with API key).
Returns
-------
cdsapi.Client
CDS API client instance.
"""
return self._client
[docs]
def list_variables(self, type: str = None) -> List[str]:
"""
List variables available for the product.
Parameters
----------
type : str, optional
Filter by type (e.g., "ocean"). Default is None.
Returns
-------
List[str]
List of variable names.
"""
if type == "ocean":
return [
var_name
for var_name, var_info in self.product_config["variables"].items()
if var_info["type"] == "ocean"
]
return list(self.product_config["variables"].keys())
[docs]
def download_data(
self,
dry_run: bool = True,
*args,
**kwargs,
) -> DownloadResult:
"""
Download data for the product.
Routes to product-specific download methods based on the product type.
Parameters
----------
dry_run : bool, optional
If True, only check what would be downloaded without actually downloading.
Default is True.
*args
Arguments passed to product-specific download method.
**kwargs
Keyword arguments passed to product-specific download method.
Returns
-------
DownloadResult
Result with information about downloaded, skipped, and error files.
Raises
------
ValueError
If the product is not supported.
"""
if self.product == "ERA5":
return self.download_data_era5(dry_run=dry_run, *args, **kwargs)
elif self.product == "CERRA":
return self.download_data_cerra(dry_run=dry_run, *args, **kwargs)
else:
raise ValueError(f"Download for product {self.product} not supported")
[docs]
def download_data_era5(
self,
variables: List[str],
years: List[str],
months: List[str],
days: List[str] = None,
times: List[str] = None,
area: List[float] = None,
product_type: str = "reanalysis",
data_format: str = "netcdf",
download_format: str = "unarchived",
force: bool = False,
dry_run: bool = True,
) -> DownloadResult:
"""
Download ERA5 data.
Downloads ERA5 reanalysis data for specified variables, time periods, and optionally
a geographic area. Files are saved to:
base_path_to_download/product/dataset/type/product_type/variable/filename.nc
Parameters
----------
variables : List[str]
List of variable names to download. If empty, downloads all available variables.
years : List[str]
List of years to download (e.g., ["2020", "2021"]).
months : List[str]
List of months to download (e.g., ["01", "02"]).
days : List[str], optional
List of days to download. If None, downloads all days (1-31). Default is None.
times : List[str], optional
List of times to download (e.g., ["00:00", "12:00"]). If None, downloads all hours.
Default is None.
area : List[float], optional
Geographic area as [north, west, south, east]. If None, downloads global data.
Default is None.
product_type : str, optional
Product type (e.g., "reanalysis", "ensemble_mean"). Default is "reanalysis".
data_format : str, optional
Data format. Default is "netcdf".
download_format : str, optional
Download format. Default is "unarchived".
force : bool, optional
Force re-download even if file exists. Default is False.
dry_run : bool, optional
If True, only check what would be downloaded. Default is True.
Returns
-------
DownloadResult
Result with all downloaded files and download statistics.
Raises
------
ValueError
If years or months are empty lists.
"""
if not isinstance(variables, list) or len(variables) == 0:
variables = list(self.product_config["variables"].keys())
if not isinstance(years, list) or len(years) == 0:
raise ValueError("Years must be a non-empty list")
years = [f"{int(year):04d}" for year in years]
if not isinstance(months, list) or len(months) == 0:
raise ValueError("Months must be a non-empty list")
months = [f"{int(month):02d}" for month in months]
last_month = months[-1]
if days is None:
days = [f"{day:02d}" for day in range(1, 32)]
if times is None:
times = [f"{hour:02d}:00" for hour in range(24)]
result = self.create_download_result()
# Prepare download tasks
download_tasks = []
for variable in variables:
for year in years:
task = self._prepare_era5_download_task(
variable=variable,
year=year,
months=months,
days=days,
times=times,
area=area,
product_type=product_type,
data_format=data_format,
download_format=download_format,
last_month=last_month,
)
if task is not None:
download_tasks.append(task)
if not download_tasks:
return self.finalize_download_result(
result, "No valid download tasks found"
)
self.logger.info(f"Prepared {len(download_tasks)} download tasks")
# Download files sequentially
for task in download_tasks:
task_result = self._download_single_file(task, force=force, dry_run=dry_run)
if isinstance(task_result, DownloadResult):
result.downloaded_files.extend(task_result.downloaded_files)
result.skipped_files.extend(task_result.skipped_files)
result.error_files.extend(task_result.error_files)
result.errors.extend(task_result.errors)
return self.finalize_download_result(result)
[docs]
def download_data_cerra(
self,
variables: List[str],
years: List[str],
months: List[str],
days: List[str] = None,
times: List[str] = None,
area: List[float] = None,
level_type: str = "surface_or_atmosphere",
data_type: List[str] = None,
product_type: str = "analysis",
data_format: str = "netcdf",
force: bool = False,
dry_run: bool = True,
) -> DownloadResult:
"""
Download CERRA data.
Downloads CERRA reanalysis data for specified variables, time periods, and optionally
a geographic area. Files are saved to:
base_path_to_download/product/dataset/type/product_type/variable/filename.nc
Parameters
----------
variables : List[str]
List of variable names to download. If empty, downloads all available variables.
years : List[str]
List of years to download (e.g., ["2020", "2021"]).
months : List[str]
List of months to download (e.g., ["01", "02"]).
days : List[str], optional
List of days to download. If None, downloads all days (1-31). Default is None.
times : List[str], optional
List of times to download (e.g., ["00:00", "12:00"]). If None, downloads standard
times (00:00, 03:00, 06:00, 09:00, 12:00, 15:00, 18:00, 21:00). Default is None.
area : List[float], optional
Geographic area as [north, west, south, east]. If None, downloads global data.
Default is None.
level_type : str, optional
Level type (e.g., "surface_or_atmosphere"). Default is "surface_or_atmosphere".
data_type : List[str], optional
Data type (e.g., ["reanalysis"]). If None, uses ["reanalysis"]. Default is None.
product_type : str, optional
Product type (e.g., "analysis", "forecast"). Default is "analysis".
data_format : str, optional
Data format. Default is "netcdf".
force : bool, optional
Force re-download even if file exists. Default is False.
dry_run : bool, optional
If True, only check what would be downloaded. Default is True.
Returns
-------
DownloadResult
Result with all downloaded files and download statistics.
Raises
------
ValueError
If years or months are empty lists.
"""
if not isinstance(variables, list) or len(variables) == 0:
variables = list(self.product_config["variables"].keys())
if not isinstance(years, list) or len(years) == 0:
raise ValueError("Years must be a non-empty list")
years = [f"{int(year):04d}" for year in years]
if not isinstance(months, list) or len(months) == 0:
raise ValueError("Months must be a non-empty list")
months = [f"{int(month):02d}" for month in months]
last_month = months[-1]
if days is None:
days = [f"{day:02d}" for day in range(1, 32)]
if times is None:
times = [
"00:00",
"03:00",
"06:00",
"09:00",
"12:00",
"15:00",
"18:00",
"21:00",
]
if data_type is None:
data_type = ["reanalysis"]
result = self.create_download_result()
# Prepare download tasks
download_tasks = []
for variable in variables:
for year in years:
task = self._prepare_cerra_download_task(
variable=variable,
year=year,
months=months,
days=days,
times=times,
area=area,
level_type=level_type,
data_type=data_type,
product_type=product_type,
data_format=data_format,
last_month=last_month,
)
if task is not None:
download_tasks.append(task)
if not download_tasks:
return self.finalize_download_result(
result, "No valid download tasks found"
)
self.logger.info(f"Prepared {len(download_tasks)} download tasks")
# Download files sequentially
for task in download_tasks:
task_result = self._download_single_file(task, force=force, dry_run=dry_run)
if isinstance(task_result, DownloadResult):
result.downloaded_files.extend(task_result.downloaded_files)
result.skipped_files.extend(task_result.skipped_files)
result.error_files.extend(task_result.error_files)
result.errors.extend(task_result.errors)
return self.finalize_download_result(result)
def _prepare_era5_download_task(
self,
variable: str,
year: str,
months: List[str],
days: List[str],
times: List[str],
area: Optional[List[float]],
product_type: str,
data_format: str,
download_format: str,
last_month: str,
) -> Optional[Dict[str, Any]]:
"""
Prepare a download task for ERA5.
Creates a task dictionary with all necessary information for downloading
a single variable for a single year.
Parameters
----------
variable : str
Variable name.
year : str
Year (formatted as "YYYY").
months : List[str]
List of months (formatted as "MM").
days : List[str]
List of days (formatted as "DD").
times : List[str]
List of times (formatted as "HH:MM").
area : Optional[List[float]]
Geographic area as [north, west, south, east] or None.
product_type : str
Product type.
data_format : str
Data format.
download_format : str
Download format.
last_month : str
Last month in the list (used for date range formatting).
Returns
-------
Optional[Dict[str, Any]]
Task dictionary with download information, or None if configuration is invalid.
"""
variable_config = self.product_config["variables"].get(variable)
if variable_config is None:
self.logger.error(f"Variable {variable} not found in configuration")
return None
variable_dataset = self.product_config["datasets"].get(
variable_config["dataset"]
)
if variable_dataset is None:
self.logger.error(
f"Dataset {variable_config['dataset']} not found in configuration"
)
return None
template_for_variable = variable_dataset["template"].copy()
if variable == "spectra":
template_for_variable["date"] = (
f"{year}-{months[0]}-01/to/{year}-{months[-1]}-31"
)
if area is not None:
template_for_variable["area"] = "/".join([str(coord) for coord in area])
else:
template_for_variable["variable"] = variable_config["cds_name"]
template_for_variable["year"] = year
template_for_variable["month"] = months
template_for_variable["day"] = days
template_for_variable["time"] = times
template_for_variable["product_type"] = product_type
template_for_variable["data_format"] = data_format
template_for_variable["download_format"] = download_format
if area is not None:
template_for_variable["area"] = area
# Check mandatory fields
for mandatory_field in variable_dataset["mandatory_fields"]:
if template_for_variable.get(mandatory_field) is None:
try:
template_for_variable[mandatory_field] = variable_config[
mandatory_field
]
except KeyError:
self.logger.error(
f"Mandatory field {mandatory_field} not found for {variable}"
)
return None
# Create output file path
output_nc_file = os.path.join(
self.base_path_to_download,
self.product,
variable_config["dataset"],
variable_config["type"],
product_type,
variable_config["cds_name"],
f"{variable_config['nc_name']}_{year}_{'_'.join(months)}.nc",
)
return {
"variable": variable,
"year": year,
"variable_config": variable_config,
"variable_dataset": variable_dataset,
"template": template_for_variable,
"output_file": output_nc_file,
"last_month": last_month,
}
def _prepare_cerra_download_task(
self,
variable: str,
year: str,
months: List[str],
days: List[str],
times: List[str],
area: Optional[List[float]],
level_type: str,
data_type: List[str],
product_type: str,
data_format: str,
last_month: str,
) -> Optional[Dict[str, Any]]:
"""
Prepare a download task for CERRA.
Creates a task dictionary with all necessary information for downloading
a single variable for a single year.
Parameters
----------
variable : str
Variable name.
year : str
Year (formatted as "YYYY").
months : List[str]
List of months (formatted as "MM").
days : List[str]
List of days (formatted as "DD").
times : List[str]
List of times (formatted as "HH:MM").
area : Optional[List[float]]
Geographic area as [north, west, south, east] or None.
level_type : str
Level type.
data_type : List[str]
Data type list.
product_type : str
Product type.
data_format : str
Data format.
last_month : str
Last month in the list (used for date range formatting).
Returns
-------
Optional[Dict[str, Any]]
Task dictionary with download information, or None if configuration is invalid.
"""
variable_config = self.product_config["variables"].get(variable)
if variable_config is None:
self.logger.error(f"Variable {variable} not found in configuration")
return None
variable_dataset = self.product_config["datasets"].get(
variable_config["dataset"]
)
if variable_dataset is None:
self.logger.error(
f"Dataset {variable_config['dataset']} not found in configuration"
)
return None
template_for_variable = variable_dataset["template"].copy()
template_for_variable["variable"] = [variable_config["cds_name"]]
template_for_variable["level_type"] = level_type
template_for_variable["data_type"] = data_type
template_for_variable["product_type"] = product_type
template_for_variable["year"] = [year]
template_for_variable["month"] = months
template_for_variable["day"] = days
template_for_variable["time"] = times
template_for_variable["data_format"] = data_format
if area is not None:
template_for_variable["area"] = area
# Check mandatory fields
for mandatory_field in variable_dataset["mandatory_fields"]:
if template_for_variable.get(mandatory_field) is None:
self.logger.error(
f"Mandatory field {mandatory_field} not found for {variable}"
)
return None
# Create output file path
output_nc_file = os.path.join(
self.base_path_to_download,
self.product,
variable_config["dataset"],
variable_config["type"],
product_type,
variable_config["cds_name"],
f"{variable_config['nc_name']}_{year}_{'_'.join(months)}.nc",
)
return {
"variable": variable,
"year": year,
"variable_config": variable_config,
"template": template_for_variable,
"last_month": last_month,
"output_file": output_nc_file,
}
def _download_single_file(
self, task: Dict[str, Any], force: bool = False, dry_run: bool = True
) -> DownloadResult:
"""
Download a single file based on a task dictionary.
Parameters
----------
task : Dict[str, Any]
Task dictionary containing download information (output_file, template, etc.).
force : bool, optional
Force re-download even if file exists. Default is False.
dry_run : bool, optional
If True, only check what would be downloaded. Default is True.
Returns
-------
DownloadResult
Result with information about the downloaded, skipped, or error file.
"""
result = DownloadResult()
output_file = task["output_file"]
variable = task["variable"]
variable_config = task["variable_config"]
template = task["template"]
if not dry_run:
os.makedirs(os.path.dirname(output_file), exist_ok=True)
try:
# Check if file already exists
if not force and os.path.exists(output_file):
if dry_run:
result.add_skipped(output_file, "File already exists (dry run)")
else:
result.add_downloaded(output_file)
return result
if dry_run:
result.add_skipped(output_file, f"Would download {variable} (dry run)")
return result
# Download file
self.logger.debug(f"Downloading: {variable} to {output_file}")
self.client.retrieve(
name=variable_config["dataset"],
request=template,
target=output_file,
)
result.add_downloaded(output_file)
self.logger.info(f"Downloaded: {output_file}")
except Exception as e:
self.logger.error(f"Error downloading {output_file}: {e}")
result.add_error(output_file, e)
return result