Source code for weatherdb.station.StationT

# libraries
import logging
import sqlalchemy as sa
from sqlalchemy import text as sqltxt
from functools import cached_property

from ..db.connections import db_engine
from ..utils.dwd import dwd_id_to_str
from ..db.models import MetaT
from .StationBases import StationTETBase

# set settings
# ############
__all__ = ["StationT"]
log = logging.getLogger(__name__)

# class definition
##################
[docs] class StationT(StationTETBase): """A class to work with and download temperaure data for one station.""" # common settings _MetaModel = MetaT _para = "t" _para_base = _para _para_long = "Temperature" _unit = "°C" _decimals = 10 _valid_kinds = {"raw", "raw_min", "raw_max", "qc", "filled", "filled_min", "filled_max", "filled_by"} # cdc dwd parameters _ftp_folder_base = [ "climate_environment/CDC/observations_germany/climate/daily/kl/"] _cdc_date_col = "MESS_DATUM" _cdc_col_names_imp = ["TMK", "TNK", "TXK"] _db_col_names_imp = ["raw", "raw_min", "raw_max"] # aggregation _agg_fun = "avg" # for regionalistaion _ma_terms = ["year"] _coef_sign = ["-", "+"] # # for the fillup _filled_by_n = 5 _fillup_max_dist = 100e3 def __init__(self, id, **kwargs): super().__init__(id, **kwargs) self.id_str = dwd_id_to_str(id) @cached_property def _table(self): return sa.table( f"{self.id}_{self._para}", sa.column("timestamp", sa.Date), sa.column("raw", sa.Integer), sa.column("raw_min", sa.Integer), sa.column("raw_max", sa.Integer), sa.column("qc", sa.Integer), sa.column("filled", sa.Integer), sa.column("filled_min", sa.Integer), sa.column("filled_max", sa.Integer), sa.column("filled_by", sa.SmallInteger), schema="timeseries") def _create_timeseries_table(self): """Create the timeseries table in the DB if it is not yet existing.""" sql_add_table = f''' CREATE TABLE IF NOT EXISTS timeseries."{self.id}_{self._para}" ( timestamp date PRIMARY KEY, raw integer NULL DEFAULT NULL, raw_min integer NULL DEFAULT NULL, raw_max integer NULL DEFAULT NULL, qc integer NULL DEFAULT NULL, filled integer NULL DEFAULT NULL, filled_min integer NULL DEFAULT NULL, filled_max integer NULL DEFAULT NULL, filled_by smallint[{self._filled_by_n}] NULL DEFAULT NULL ); ''' with db_engine.connect() as con: con.execute(sqltxt(sql_add_table)) con.commit() def _get_sql_new_qc(self, period): # inversion possible? do_invers = self.get_meta(infos=["stationshoehe"])>800 sql_nears = self._get_sql_near_median( period=period, only_real=False, add_is_winter=do_invers, extra_cols="raw-nbs_median AS diff") if do_invers: # without inversion sql_null_case = "CASE WHEN (winter) THEN "+\ f"diff < {-5 * self._decimals} ELSE "+\ f"ABS(diff) > {5 * self._decimals} END "+\ f"OR raw < {-50 * self._decimals} OR raw > {50 * self._decimals}" else: # with inversion sql_null_case = f"ABS(diff) > {5 * self._decimals}" # create sql for new qc sql_new_qc = f""" WITH nears AS ({sql_nears}) SELECT timestamp, (CASE WHEN ({sql_null_case}) THEN NULL ELSE nears."raw" END) as qc FROM nears """ return sql_new_qc @db_engine.deco_update_privilege def _sql_fillup_extra_dict(self, **kwargs): # additional parts to calculate the filling of min and max fillup_extra_dict = super()._sql_fillup_extra_dict(**kwargs) sql_array_init = "ARRAY[{0}]".format( ", ".join(["NULL::smallint"] * self._filled_by_n)) fillup_extra_dict.update({ "extra_new_temp_cols": "raw_min AS filled_min, raw_max AS filled_max," + f"{sql_array_init} AS nb_min, {sql_array_init} AS nb_max,", "extra_cols_fillup_calc": "filled_min=round(nb.raw_min + %3$s, 0)::int, " + "filled_max=round(nb.raw_max + %3$s, 0)::int, ", "extra_cols_fillup": "filled_min = new.filled_min, " + "filled_max = new.filled_max, ", "extra_fillup_where": ' OR ts."filled_min" IS DISTINCT FROM new."filled_min"' + ' OR ts."filled_max" IS DISTINCT FROM new."filled_max"', "extra_exec_cols": "nb_max[{i}]=round(nb.raw_max + %3$s, 0)::int,"+ "nb_min[{i}]=round(nb.raw_min + %3$s, 0)::int,", "extra_fillup_update_where": ' OR nf."filled_min" IS NULL OR nf."filled_max" IS NULL', "extra_after_loop_extra_col": """, filled_min=(SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY v) FROM unnest(nb_min) as T(v)), filled_max=(SELECT percentile_cont(0.5) WITHIN GROUP (ORDER BY v) FROM unnest(nb_max) as T(v))"""}) return fillup_extra_dict
[docs] def get_multi_annual_raster(self): mas = super().get_multi_annual_raster() if mas is not None: return [ma / 10 for ma in mas] else: return None
[docs] def get_adj(self, **kwargs): main_df, adj_df, ma, main_df_tr = super().get_adj(**kwargs) # calculate the yearly main_df_y = main_df.groupby(main_df_tr.index.year)\ .mean().mean() adj_df["adj"] = (main_df + (ma[0] - main_df_y)).round(1) return adj_df
[docs] def get_quotient(self, **kwargs): raise NotImplementedError("The quotient is not yet implemented for temperature.")