Source code for bluemath_tk.core.dask

import os

import psutil
from dask.distributed import Client, LocalCluster


def get_total_ram() -> int:
    """
    Get the total RAM in the system.

    Returns
    -------
    int
        The total RAM in bytes.
    """

    return psutil.virtual_memory().total


def get_available_ram() -> int:
    """
    Get the available RAM in the system.

    Returns
    -------
    int
        The available RAM in bytes.
    """

    return psutil.virtual_memory().available


[docs] def setup_dask_client(n_workers: int = None, memory_limit: str = 0.5): """ Setup a Dask client with controlled resources. Parameters ---------- n_workers : int, optional Number of workers. Default is None. memory_limit : str, optional Memory limit per worker. Default is 0.5. Returns ------- Client Dask distributed client Notes ----- - Resources might vary depending on the hardware and the load of the machine. Be very careful when setting the number of workers and memory limit, as it might affect the performance of the machine, or in the worse case scenario, the performance of other users in the same machine (cluster case). """ if n_workers is None: n_workers = int(os.environ.get("BLUEMATH_NUM_WORKERS", "1")) if isinstance(memory_limit, float): memory_limit *= get_available_ram() / get_total_ram() cluster = LocalCluster( n_workers=n_workers, threads_per_worker=1, memory_limit=memory_limit ) client = Client(cluster) return client