diff --git a/app/indicators/dataset.py b/app/indicators/dataset.py new file mode 100644 index 0000000000000000000000000000000000000000..d733ab8e6e8f41a1f81a68c76c7d3810d9e6ef30 --- /dev/null +++ b/app/indicators/dataset.py @@ -0,0 +1,22 @@ +from dataclasses import dataclass + +from app.scenarios.historical import HISTORICAL +from app.scenarios.ssp import SSP1_2_6, SSP2_4_5, SSP5_8_5 +from app.time.period import Period, YearRange + + +@dataclass +class Dataset: + name: str + available_periods: dict[str, Period] + + +CMIP6_PROJECTIONS = Dataset( + name="projections-cmip6", + available_periods={ + HISTORICAL.name: YearRange(1950, 65), + SSP1_2_6.name: YearRange(2015, 86), + SSP2_4_5.name: YearRange(2015, 86), + SSP5_8_5.name: YearRange(2015, 86), + }, +) diff --git a/app/indicators/indicator.py b/app/indicators/indicator.py index f8d576bffcd56372ca2b8aee1b12406185309413..0d3b5abbec6d551d2bb29cc7dae2fa0d4ff738d0 100644 --- a/app/indicators/indicator.py +++ b/app/indicators/indicator.py @@ -1,13 +1,29 @@ +import os +from collections.abc import Iterable +from itertools import product +from zipfile import ZipFile + +import cdsapi as _cdsapi + +from app.indicators.dataset import Dataset from app.models import Model from app.regions import Region from app.scenarios.scenario import Scenario as _Scenario from app.time.period import Period from app.units import Q_ +from config import BASE_DATA_DIR class Indicator: - def __init__(self, name: str, variable_name: str, temporal_resolution: Period): + def __init__( + self, + name: str, + dataset: Dataset, + variable_name: str, + temporal_resolution: Period, + ): self.name = name + self.dataset = dataset self.variable_name = variable_name self.temporal_resolution = temporal_resolution @@ -20,6 +36,15 @@ class Indicator: def name(self, name: str): self._name = name + @property + def dataset(self) -> Dataset: + """The name of the copernicus dataset""" + return self._dataset + + @dataset.setter + def dataset(self, dataset: Dataset): + self._dataset = dataset + @property def variable_name(self) -> str: """The copernicus name of the indicator's variable""" @@ -41,26 +66,199 @@ class Indicator: def serialize(self, depth: int = 1): return {"name": self.name, "temporal_resolution": self.temporal_resolution} + def auto_period(self, period: Period | None): + if period is not None: + return period + return self.dataset.available_periods[scenario.name] + def make_download_request_dict( - self, scenario: _Scenario, region: Region, period: Period, model: Model + self, + scenario: _Scenario, + region: Region, + model: Model, + period: Period | None = None, ): """Returns the json request to give to the CDS API to download the indicator. Args: scenario: the CMIP/CORDEX scenario to use (e.g. app.scenarios.cmip6.SSP_2_4_4) - region: the sub-region on which the indicator should be downloaded (a pyproj bbox or None) - period: the period over which the indicator should be downloaded + region: the sub-region over which the indicator should be downloaded model: the chosen model to download + period: the period over which the indicator should be downloaded. If None, + will be set to the largest period available. """ + period = self.auto_period(period) + as_dict = { "temporal_resolution": self.temporal_resolution.copernicus_resolution, "experiment": scenario.to_copernicus_name(), "variable": self.variable_name, "format": "zip", + "model": model.to_copernicus_name(), } as_dict |= period.to_compact_dict_resolution(self.temporal_resolution) - if region: - as_dict["area"] = [region.north, region.west, region.south, region.east] + if region.bbox: + bbox = region.bbox + as_dict["area"] = [bbox.north, bbox.west, bbox.south, bbox.east] return as_dict + + def file_path( + self, + scenario: _Scenario, + region: Region, + model: Model, + period: Period | None = None, + file_format: str | None = None, + ): + """Returns the path to the files associated with this indicator. + + Args: + scenario: the CMIP/CORDEX scenario to use (e.g. app.scenarios.cmip6.SSP_2_4_4) + region: the sub-region over which the indicator has been downloaded + model: the model-specific instance of the indicator that has been downloaded + period: the period over which the indicator has been downloaded. If None, + will be set to the largest period available. + """ + period = self.auto_period(period) + + fmt = f".{file_format}" if file_format else "" + return os.path.join( + BASE_DATA_DIR, + self.name, + region.name, + scenario.name, + model.name, + period.describe() + fmt, + ) + + def download_single( + self, + scenario: _Scenario, + region: Region, + model: Model, + period: Period | None = None, + ): + """Downloads a single-scenario, single-region, single-period, single-model + instance of an indicator. Then extracts and rename files following a + systematic name scheme. + + Args: + scenario: the CMIP/CORDEX scenario to use (e.g. app.scenarios.cmip6.SSP_2_4_4) + region: a region over which the indicator should be downloaded + model: the chosen model to download + period: the period over which the indicator should be downloaded. If None, + will be set to the largest period available. + + Returns: + the path to the NetCDF file + """ + period = self.auto_period(period) + + request_dict = self.make_download_request_dict( + scenario=scenario, region=region, period=period, model=model + ) + + cds = _cdsapi.Client() + + zip_path = self.file_path( + scenario=scenario, + region=region, + model=model, + period=period, + file_format="zip", + ) + dir_path = self.file_path( + scenario=scenario, region=region, period=period, model=model + ) + + nc_path = self.file_path( + scenario=scenario, + region=region, + model=model, + period=period, + file_format="nc", + ) + + if not os.path.exists(dir_path): + os.makedirs(dir_path) + + cds.retrieve(self.dataset.name, request_dict, zip_path) + + # extract zip file + with ZipFile(zip_path, "r") as zip_file: + zip_file.extractall(dir_path) + + # remove zip file + os.remove(zip_path) + + # rename extracted netcdf file + for file in os.listdir(dir_path): + if file.endswith(".nc"): + os.rename(os.path.join(dir_path, file), nc_path) + + return nc_path + + def download_bulk( + self, + scenarios: _Scenario | Iterable[_Scenario], + regions: Region | Iterable[Region], + models: Model | Iterable[Model], + periods: Period | Iterable[Period], + ): + """Downloads multiple instances of an indicator. + + Args: + scenario: a CMIP/CORDEX scenario, or an iterable of scenarios to use + (e.g. app.scenarios.cmip6.SSP_2_4_4) + region: a region, or iterable of regions over which the indicator should be downloaded + model: a model, r iterable of models to download + period: the period or periods over which the indicator should be downloaded. If None, + will be set to the largest period available. + + Returns: + the paths to all downladed files""" + if isinstance(scenarios, _Scenario): + scenarios = [scenarios] + + if isinstance(regions, Region): + regions = [regions] + + if isinstance(periods, Period): + periods = [periods] + + if isinstance(models, Model): + models = [models] + + paths = [] + + for scenario, region, period, model in product( + scenarios, regions, periods, models + ): + paths.append( + self.download_single( + scenario=scenario, region=region, period=period, model=model + ) + ) + return paths + + def download( + self, + scenario: _Scenario | Iterable[_Scenario] | None = None, + region: Region | Iterable[Region] | None = None, + period: Period | Iterable[Period] | None = None, + model: Model | Iterable[Model] | None = None, + scenarios: _Scenario | Iterable[_Scenario] | None = None, + regions: Region | Iterable[Region] | None = None, + periods: Period | Iterable[Period] | None = None, + models: Model | Iterable[Model] | None = None, + ): + scenarios = scenario or scenarios + regions = region or regions + periods = period or periods + models = model or models + + return self.download_bulk( + scenarios=scenarios, regions=regions, periods=periods, models=models + ) diff --git a/app/indicators/processing.py b/app/indicators/processing.py new file mode 100644 index 0000000000000000000000000000000000000000..bf808a022b671d7a692c425dda919507f73675de --- /dev/null +++ b/app/indicators/processing.py @@ -0,0 +1,66 @@ +from collections.abc import Generator +from datetime import datetime, timedelta +from math import ceil, floor, inf + +from netCDF4 import Dataset as _NCDataset + +from app.time.utils import parse_date_in_days_since + + +class NetCDFFile: + def __init__(self, path: str): + self.path = path + self.nc = _NCDataset(self.path) + self.variable_name = self.get_variable_name() + self.extract_coordinates_and_variable() + + def get_variable_name(self): + """Returns the name of the NetCDF variable of this file.""" + return self.nc.variable_id + + def extract_coordinates_and_variable(self): + """Extracts space/time coordinates and variable arrays from a NetCDF file. + + Returns: + a tuple with four arrays : latitutdes, longitudes, time and the variable + """ + var_name = self.get_variable_name() + + self.lat = self.nc.variables["lat"][:] + self.lon = self.nc.variables["lon"][:] + self.time = self.nc.variables["time"][:] + self.variable = self.nc.variables[var_name][:] + + return self.lat, self.lon, self.time, self.variable + + def yield_batch( + self, batch_size: int = 10000, limit=inf + ) -> Generator[list[tuple[float, float, datetime, float]]]: + """Yield `batch_size` variables to put in the database as 4-uples (lat, long, time, variable) + + Args: + batch_size: the size of batches to yield (number of tuples), defaults to 10000 + limit: optional limit of + + """ + initial_date = parse_date_in_days_since(self.nc.parent_time_units) + + n_iter = 0 + accumulator = [] + + for t, days_since in enumerate(self.time): + if n_iter >= limit: + break + + timestamp = initial_date + timedelta(days=days_since) + + for lo, longitude in enumerate(self.lon): + for la, latitude in enumerate(self.lat): + v = self.variable[t, la, lo] + accumulator.append((longitude, latitude, timestamp, v)) + n_iter += 1 + if n_iter % batch_size == 0 and n_iter != 0: + yield accumulator + accumulator = [] + if len(accumulator) != 0: + yield accumulator diff --git a/app/models/cmip6.py b/app/models/cmip6.py index 480db960747e585577d29c4c7a80818ae99a393a..1d7897508ee83012fb04499119e6a7172d7d66fe 100644 --- a/app/models/cmip6.py +++ b/app/models/cmip6.py @@ -1,7 +1,6 @@ -from app.models.model import Model as _Model -from app.models.experiments import Experiment as _Experiment import app.models.institutes as _i - +from app.models.experiments import Experiment as _Experiment +from app.models.model import Model as _Model ACCESS_CM2 = _Model( name="ACCESS-CM2", experiment=_Experiment.CMIP6, institute=_i.ACCESS @@ -50,12 +49,12 @@ CMCC_CM2_SR5 = _Model( ) CMCC_ESM2 = _Model(name="CMCC-ESM2", experiment=_Experiment.CMIP6, institute=_i.CMCC) -CNRM_CM6_1 = _Model(name="CMCC-CM6-1", experiment=_Experiment.CMIP6, institute=_i.CNRM) +CNRM_CM6_1 = _Model(name="CNRM-CM6-1", experiment=_Experiment.CMIP6, institute=_i.CNRM) CNRM_CM6_1_HR = _Model( - name="CMCC-CM6-1-HR", experiment=_Experiment.CMIP6, institute=_i.CNRM + name="CNRM-CM6-1-HR", experiment=_Experiment.CMIP6, institute=_i.CNRM ) CNRM_ESM2_1 = _Model( - name="CMCC-ESM2-1", experiment=_Experiment.CMIP6, institute=_i.CNRM + name="CNRM-ESM2-1", experiment=_Experiment.CMIP6, institute=_i.CNRM ) E3SM_1_0 = _Model(name="E3SM-1-0", experiment=_Experiment.CMIP6, institute=_i.E3SM) diff --git a/app/models/model.py b/app/models/model.py index 96044a943b1adc3b805bae78104acae72c777728..41c3284e10e07c22cb52450e2b2ff88fc8c11044 100644 --- a/app/models/model.py +++ b/app/models/model.py @@ -2,8 +2,8 @@ from dataclasses import dataclass from fastapi import Request -from app.models.institutes import Institute from app.models.experiments import Experiment +from app.models.institutes import Institute @dataclass @@ -39,3 +39,6 @@ class Model: if base_url: s["url"] = f"{base_url}models/{self.flat_name()}" return s + + def to_copernicus_name(self) -> str: + return self.name.lower().replace(".", "_").replace(" ", "_").replace("-", "_") diff --git a/app/time/period.py b/app/time/period.py index ce53747693af4acbcf7859766a0b5450ca278257..1327bab2debf10039282cb8787383ae7e9b5f85d 100644 --- a/app/time/period.py +++ b/app/time/period.py @@ -8,7 +8,6 @@ from datetime import datetime, timedelta from enum import Enum from typing import Type - two_digits_format = lambda x: str(x).zfill(2) ALL_DAYS = [two_digits_format(d) for d in range(1, 31 + 1)] @@ -68,6 +67,7 @@ def timerange( class Period: start: datetime end: datetime + copernicus_resolution: str | None def convert_period(self, resolution: Period | "Resolution") -> Generator[Period]: from app.time.resolution import Resolution # to prevent circular import @@ -129,6 +129,10 @@ class Period: args["first_year"] = args.year return args + @abstractmethod + def describe(self) -> str: + ... + class Day(Period): copernicus_resolution = "daily" @@ -193,6 +197,9 @@ class Day(Period): new_res = res(**self.generic_args()) return new_res.to_dict() + def describe(self) -> str: + return f"{self.year}_{self.month}_{self.day}" + class Month(Period): copernicus_resolution = "monthly" @@ -258,6 +265,9 @@ class Month(Period): new_res = res(**self.generic_args()) return new_res.to_dict() + def describe(self) -> str: + return f"{self.year}_{self.month}" + class Year(Period): copernicus_resolution = "yearly" @@ -310,12 +320,67 @@ class Year(Period): new_res = res(**self.generic_args()) return new_res.to_dict() + def describe(self) -> str: + return f"{self.year}" + + +class YearRange(Period): + copernicus_resolution = None + + def __init__(self, first_year: int, duration: int, **kwargs): + self.duration = duration + self.first_year = first_year + + def step(self, dt: datetime) -> datetime: + return datetime( + dt.year + self.duration, dt.month, dt.day, dt.hour, dt.minute, dt.second + ) + + def _compute_end(self) -> datetime: + return self.step(self.start) - timedelta(days=0, seconds=1) + + @property + def first_year(self) -> int: + return self._first_year + + @first_year.setter + def first_year(self, first_year: int): + self._first_year = first_year + self.start = datetime(first_year, 1, 1) + self.end = self._compute_end() + + @property + def duration(self) -> int: + return self._duration + + @duration.setter + def duration(self, duration: int): + self._duration = duration + + def to_dict(self) -> dict[str, str]: + return { + "year": [ + str(y) for y in range(self.first_year, self.first_year + self.duration) + ] + } + + def to_compact_dict_resolution(self, res: Type[Period]) -> dict[str, str]: + years_dict = self.to_dict() + if res is Day: + return {"day": ALL_DAYS, "month": ALL_MONTHS, **years_dict} + if res is Month: + return {"month": ALL_MONTHS, **years_dict} + return years_dict + + def describe(self) -> str: + return f"{self.first_year}-{self.first_year + self.duration}" + def multi_year_period_factory(step_number: int): class MultiYearPeriodClass(Period): @staticmethod - def from_start_time(dt: datetime) -> FiveYears: - return FiveYears(first_year=dt.year) + def from_start_time(dt: datetime) -> MultiYearPeriodClass: + return MultiYearPeriodClass(first_year=dt.year) def __init__(self, first_year: int, **kwargs): self.first_year = first_year