Source code for bluemath_tk.downloaders.copernicus.copernicus_downloader

import os
import json
from typing import List
import calendar
import cdsapi
import xarray as xr
from .._base_downloaders import BaseDownloader

config = {
    "url": "https://cds.climate.copernicus.eu/api",  # /v2?
    "key": "your-api-token",
}


[docs] class CopernicusDownloader(BaseDownloader): """ This is the main class to download data from the Copernicus Climate Data Store. Attributes ---------- product : str The product to download data from. Currently only ERA5 is supported. product_config : dict The configuration for the product to download data from. client : cdsapi.Client The client to interact with the Copernicus Climate Data Store API. Notes ----- - The configuration for the products is stored in the `products_configs` attribute. - The configuration for the ERA5 product is stored in the `ERA5` folder. """ products_configs = { "ERA5": json.load( open(os.path.join(os.path.dirname(__file__), "ERA5", "ERA5_config.json")) # open( # "/home/grupos/geocean/tausiaj/BlueMath_tk/bluemath_tk/downloaders/copernicus/ERA5/ERA5_config.json" # ) ) } def __init__( self, product: str, base_path_to_download: str, token: str = None, debug: bool = True, check: bool = False, ) -> None: """ This is the constructor for the CopernicusDownloader class. Parameters ---------- product : str The product to download data from. Currently only ERA5 is supported. base_path_to_download : str The base path to download the data to. token : str, optional The API token to use to download data. Defaults to None. debug : bool, optional Whether to run in debug mode. Defaults to True. check : bool, optional Whether to just check the data. Defaults to False. Raises ------ ValueError If the product configuration is not found. """ super().__init__( base_path_to_download=base_path_to_download, debug=debug, check=check ) self._product = product self._product_config = self.products_configs.get(product) if self._product_config is None: raise ValueError(f"{product} configuration not found") self.set_logger_name(f"CopernicusDownloader - {product}") if not self.check: self._client = cdsapi.Client( url=config["url"], key=token or config["key"], debug=self.debug ) self.logger.info("---- DOWNLOADING DATA ----") else: self.logger.info("---- CHECKING DATA ----") @property def product(self) -> str: return self._product @property def product_config(self) -> dict: return self._product_config @property def client(self) -> cdsapi.Client: return self._client
[docs] def list_variables(self, type: str = None) -> List[str]: """ Lists the variables available for the product. Filtering by type if provided. Parameters ---------- type : str, optional The type of variables to list. Default is None. Returns ------- List[str] The list of variables available for the product. """ if type == "ocean": return [ var_name for var_name, var_info in self.product_config["variables"].items() if var_info["type"] == "ocean" ] return list(self.product_config["variables"].keys())
[docs] def list_datasets(self) -> List[str]: """ Lists the datasets available for the product. Returns ------- List[str] The list of datasets available for the product. """ return list(self.product_config["datasets"].keys())
[docs] def show_markdown_table(self) -> None: """ Create a Markdown table from the configuration dictionary and print it. """ # Define the table headers headers = ["name", "long_name", "units", "type"] header_line = "| " + " | ".join(headers) + " |" separator_line = ( "| " + " | ".join(["-" * len(header) for header in headers]) + " |" ) # Initialize the table with headers table_lines = [header_line, separator_line] # Add rows for each variable for var_name, var_info in self.product_config["variables"].items(): long_name = var_info.get("long_name", "") units = var_info.get("units", "") type = var_info.get("type", "") row = f"| {var_name} | {long_name} | {units} | {type} |" table_lines.append(row) # Print the table print("\n".join(table_lines))
[docs] def download_data(self, *args, **kwargs) -> str: """ Downloads the data for the product. Parameters ---------- *args The arguments to pass to the download function. **kwargs The keyword arguments to pass to the download function. Returns ------- str The message with the fully downloaded files and the not fully downloaded files. Raises ------ ValueError If the product is not supported. """ if self.product == "ERA5": return self.download_data_era5(*args, **kwargs) else: raise ValueError(f"Download for product {self.product} not supported")
[docs] def download_data_era5( self, variables: List[str], years: List[str], months: List[str], day: List[str] = None, time: List[str] = None, product_type: str = "reanalysis", data_format: str = "netcdf_legacy", download_format: str = "unarchived", force: bool = False, ) -> str: """ Downloads the data for the ERA5 product. Parameters ---------- variables : List[str] The list of variables to download. years : List[str] The list of years to download. months : List[str] The list of months to download. day : List[str], optional The list of days to download. Defaults to None. time : List[str], optional The list of times to download. Defaults to None. product_type : str, optional The product type to download. Defaults to "reanalysis". data_format : str, optional The data format to download. Defaults to "netcdf_legacy". This format is maintained to legacy to allow Thredds server to read the files. download_format : str, optional The download format to use. Defaults to "unarchived". force : bool, optional Whether to force the download. Defaults to False. Returns ------- str The message with the fully downloaded files and the not fully downloaded files. Error files are also included. Raises ------ ValueError If the variables, years, months, day, time, product_type, data_format, or download_format are not valid or if the force is not a boolean. Notes ----- - The variables, years, months, day, and time must be lists of strings. - The product_type, data_format, and download_format must be strings. - The force must be a boolean. - Leave variables empty to download all variables. - Leave day empty to download all days. - Leave time empty to download all times. """ if not isinstance(variables, list): raise ValueError("Variables must be a list of strings") elif len(variables) == 0: variables = list(self.product_config["variables"].keys()) self.logger.info(f"Variables not provided. Using {variables}") if not isinstance(years, list) or len(years) == 0: raise ValueError("Years must be a non-empty list of strings") if not isinstance(months, list) or len(months) == 0: raise ValueError("Months must be a non-empty list of strings") if day is not None: if not isinstance(day, list) or len(day) == 0: raise ValueError("Day must be a non-empty list of strings") else: day = [f"{day:02d}" for day in range(1, 32)] self.logger.info(f"Day not provided. Using {day}") if time is not None: if not isinstance(time, list) or len(time) == 0: raise ValueError("Time must be a non-empty list of strings") else: time = [f"{hour:02d}:00" for hour in range(24)] self.logger.info(f"Time not provided. Using {time}") if not isinstance(product_type, str): raise ValueError("Product type must be a string") if not isinstance(data_format, str): raise ValueError("Data format must be a string") if not isinstance(download_format, str): raise ValueError("Download format must be a string") if not isinstance(force, bool): raise ValueError("Force must be a boolean") fully_downloaded_files: List[str] = [] NOT_fullly_downloaded_files: List[str] = [] error_files: List[str] = [] for variable in variables: for year in years: year = f"{int(year):04d}" # Ensure year is 4 digits for month in months: month = f"{int(month):02d}" # Ensure month is 2 digits variable_config = self.product_config["variables"].get(variable) if variable_config is None: self.logger.error( f"Variable {variable} not found in product configuration file" ) continue variable_dataset = self.product_config["datasets"].get( variable_config["dataset"] ) if variable_dataset is None: self.logger.error( f"Dataset {variable_config['dataset']} not found in product configuration file" ) continue template_for_variable = variable_dataset["template"].copy() template_for_variable["variable"] = variable_config["cds_name"] template_for_variable["year"] = year template_for_variable["month"] = month template_for_variable["day"] = day template_for_variable["time"] = time template_for_variable["product_type"] = product_type template_for_variable["data_format"] = data_format template_for_variable["download_format"] = download_format skip_because_of_manadatory_fields = False for mandatory_field in variable_dataset["mandatory_fields"]: try: if template_for_variable.get(mandatory_field) is None: template_for_variable[mandatory_field] = ( variable_config[mandatory_field] ) except KeyError: self.logger.error( f"Mandotory field {mandatory_field} not found in variable configuration file for {variable}" ) skip_because_of_manadatory_fields = True if skip_because_of_manadatory_fields: continue # Create the output file name once request is properly formatted output_nc_file = os.path.join( self.base_path_to_download, self.product, variable_config["dataset"], variable_config["type"], product_type, variable_config["nc_name"], year, f"{variable_config['nc_name']}_{year}{month}.nc", ) # Create the output directory if it does not exist os.makedirs(os.path.dirname(output_nc_file), exist_ok=True) self.logger.info(f""" Analyzing {output_nc_file} """) try: if self.check or not force: if os.path.exists(output_nc_file): self.logger.debug( f"Checking {output_nc_file} file is complete" ) try: nc = xr.open_dataset(output_nc_file) _, last_day = calendar.monthrange( int(year), int(month) ) last_hour = f"{year}-{int(month):02d}-{last_day}T23" last_hour_nc = str(nc.time[-1].values) nc.close() if last_hour not in last_hour_nc: self.logger.debug( f"{output_nc_file} ends at {last_hour_nc} instead of {last_hour}" ) if self.check: NOT_fullly_downloaded_files.append( output_nc_file ) else: self.logger.debug( f"Downloading: {variable} to {output_nc_file} because it is not complete" ) self.client.retrieve( name=variable_config["dataset"], request=template_for_variable, target=output_nc_file, ) fully_downloaded_files.append( output_nc_file ) else: fully_downloaded_files.append(output_nc_file) except Exception as e: self.logger.error( f"Error was raised opening {output_nc_file} - {e}, re-downloading..." ) if self.check: NOT_fullly_downloaded_files.append( output_nc_file ) else: self.logger.debug( f"Downloading: {variable} to {output_nc_file} because it is not complete" ) self.client.retrieve( name=variable_config["dataset"], request=template_for_variable, target=output_nc_file, ) fully_downloaded_files.append(output_nc_file) elif self.check: NOT_fullly_downloaded_files.append(output_nc_file) else: self.logger.debug( f"Downloading: {variable} to {output_nc_file}" ) self.client.retrieve( name=variable_config["dataset"], request=template_for_variable, target=output_nc_file, ) fully_downloaded_files.append(output_nc_file) else: self.logger.debug( f"Downloading: {variable} to {output_nc_file}" ) self.client.retrieve( name=variable_config["dataset"], request=template_for_variable, target=output_nc_file, ) fully_downloaded_files.append(output_nc_file) except Exception as e: self.logger.error(f""" Skippping {output_nc_file} for {e} """) error_files.append(output_nc_file) fully_downloaded_files_str = "\n".join(fully_downloaded_files) NOT_fullly_downloaded_files_str = "\n".join(NOT_fullly_downloaded_files) error_files = "\n".join(error_files) return f""" Fully downloaded files: {fully_downloaded_files_str} Not fully downloaded files: {NOT_fullly_downloaded_files_str} Error files: {error_files} """
if __name__ == "__main__": copernicus_downloader = CopernicusDownloader( product="ERA5", base_path_to_download="/home/tausiaj/DATA/Copernicus/", debug=True, check=False, ) copernicus_downloader.download_data( variables=["geo500", "tp", "p140122"], years=["2021"], months=["01"] )