Source code for weatherdb.station.StationP

# libraries
import logging
from datetime import timedelta
from pathlib import Path
import warnings
import numpy as np
import pandas as pd
import sqlalchemy as sa
from sqlalchemy import text as sqltxt
from functools import cached_property
import rasterio as rio
import rasterio.mask
import pyproj
from shapely.ops import transform as shp_transform
from shapely import distance, Point

from ..db.connections import db_engine
from ..utils.dwd import  dwd_id_to_str
from ..utils.TimestampPeriod import TimestampPeriod
from ..utils.geometry import polar_line, raster2points
from ..config import config
from ..db.models import MetaP
from .StationBases import StationPBase
from .StationPD import StationPD
from .StationT import StationT

# set settings
# ############
__all__ = ["StationP"]
log = logging.getLogger(__name__)

# variables
RICHTER_CLASSES = {
    "no-protection": {
        "min_horizon": 0,
        "max_horizon": 3
    },
    "little-protection": {
        "min_horizon": 3,
        "max_horizon": 7
    },
    "protected": {
        "min_horizon": 7,
        "max_horizon": 12
    },
    "heavy-protection": {
        "min_horizon": 12,
        "max_horizon": np.inf
    },
}

# class definition
##################
[docs] class StationP(StationPBase): """A class to work with and download 10 minutes precipitation data for one station.""" # common settings _MetaModel = MetaP _para = "p" _para_base = _para _para_long = "Precipitation" _unit = "mm/10min" _valid_kinds = {"raw", "qn", "qc", "corr", "filled", "filled_by"} _best_kind = "corr" # cdc dwd parameters _ftp_folder_base = [ "climate_environment/CDC/observations_germany/climate/10_minutes/precipitation/"] _cdc_col_names_imp = ["RWS_10", "QN"] _db_col_names_imp = ["raw", "qn"] # timestamp configurations _tstp_format_db = "%Y%m%d %H:%M" _tstp_dtype = "timestamp" _interval = "10 min" # aggregation _min_agg_to = "10 min" def __init__(self, id, **kwargs): super().__init__(id, **kwargs) self.id_str = dwd_id_to_str(id) def _get_sql_new_qc(self, period): # create sql_format_dict sql_format_dict = dict( para=self._para, stid=self.id, para_long=self._para_long, decim=self._decimals, **period.get_sql_format_dict( format=f"'{self._tstp_format_db}'"), limit=0.1*self._decimals) # don't delete values below 0.1mm/10min if they are consecutive # check if daily station is available sql_check_d = f""" SELECT EXISTS( SELECT * FROM information_schema.tables WHERE table_schema = 'timeseries' AND table_name = '{self.id}_{self._para}_d' );""" with db_engine.connect() as con: daily_exists = con.execute(sqltxt(sql_check_d)).first()[0] # create sql for dates where the aggregated 10 minutes measurements are 0 # althought the daily measurements are not 0 # or where the aggregated daily sum is more than the double of the daily measurement, when the daily measurement is more than 10 mm if daily_exists: sql_dates_failed = """ WITH ts_10min_d AS ( SELECT (ts.timestamp - INTERVAL '6h')::date as date, sum("raw") as raw FROM timeseries."{stid}_{para}" ts WHERE ts.timestamp BETWEEN {min_tstp} AND {max_tstp} GROUP BY (ts.timestamp - INTERVAL '6h')::date) SELECT date FROM timeseries."{stid}_{para}_d" ts_d LEFT JOIN ts_10min_d ON ts_d.timestamp::date=ts_10min_d.date WHERE ts_d.timestamp BETWEEN {min_tstp}::date AND {max_tstp}::date AND ((ts_10min_d.raw = 0 AND ts_d.raw <> 0) OR (ts_10min_d.raw >= 10*{decim} AND ts_10min_d.raw >= (ts_d.raw*2))) """.format(**sql_format_dict) else: log.error(( "For the {para_long} station with ID {stid} there is no timeserie with daily values. " + "Therefor the quality check for daily values equal to 0 is not done.\n" + "Please consider updating the daily stations with:\n" + "stats = stations.StationsPD()\n" + "stats.update_meta()\nstats.update_raw()" ).format(**sql_format_dict)) sql_dates_failed = """ SELECT NULL::date as date """ # remove single peaks above 5mm/10min sql_single_peaks = """ SELECT ts.timestamp FROM timeseries."{stid}_{para}" ts INNER JOIN timeseries."{stid}_{para}" tsb ON ts.timestamp = tsb.timestamp - INTERVAL '10 min' INNER JOIN timeseries."{stid}_{para}" tsa ON ts.timestamp = tsa.timestamp + INTERVAL '10 min' WHERE ts.raw > (5*{decim}) AND tsb.raw = 0 AND tsa.raw = 0 AND ts.timestamp BETWEEN {min_tstp} AND {max_tstp} """.format(**sql_format_dict) # make sql for timestamps where 3 times same value in a row sql_tstps_failed = """ WITH tstps_df as ( SELECT ts.timestamp as tstp_1, ts2.timestamp as tstp_2, ts3.timestamp as tstp_3 from timeseries."{stid}_{para}" ts INNER JOIN timeseries."{stid}_{para}" ts2 on ts.timestamp = ts2.timestamp - INTERVAL '10 min' INNER JOIN timeseries."{stid}_{para}" ts3 on ts.timestamp = ts3.timestamp - INTERVAL '20 min' WHERE ts.qn != 3 AND ts.raw = ts2.raw AND ts2.raw = ts3.raw AND ts.raw > {limit:n} AND ts.raw is not NULL AND ts.timestamp BETWEEN {min_tstp} AND {max_tstp} ) SELECT tstp_1 AS timestamp FROM tstps_df UNION SELECT tstp_2 FROM tstps_df UNION SELECT tstp_3 FROM tstps_df UNION ({sql_single_peaks}) """.format(sql_single_peaks=sql_single_peaks, **sql_format_dict) # create sql for new qc values sql_new_qc = """ WITH tstps_failed as ({sql_tstps_failed}), dates_failed AS ({sql_dates_failed}) SELECT ts.timestamp, (CASE WHEN ((ts.timestamp IN (SELECT timestamp FROM tstps_failed)) OR ((ts.timestamp - INTERVAL '6h')::date IN ( SELECT date FROM dates_failed)) OR ts."raw" < 0 OR ts."raw" >= 50*{decim}) THEN NULL ELSE ts."raw" END) as qc FROM timeseries."{stid}_{para}" ts WHERE ts.timestamp BETWEEN {min_tstp} AND {max_tstp} """.format( sql_tstps_failed=sql_tstps_failed, sql_dates_failed=sql_dates_failed, **sql_format_dict) return sql_new_qc def _check_df_raw(self, df): """This function is used in the Base class on the single dataframe that is downloaded from the CDC Server before loading it in the database. Here the function adapts the timezone relative to the date. As the data on the CDC server is in MEZ before 200 and in UTC after 2000 Some precipitation stations on the DWD CDC server have also rows outside of the normal 10 Minute frequency, e.g. 2008-09-16 01:47 for Station 662. Because those rows only have NAs for the measurement they are deleted.""" # correct Timezone before 2000 -> CEWT after 2000 -> UTC if df.index.tzinfo is not None: df.index = df.index.tz_localize(None) # some are already falsy localized if df.index.min()>= pd.Timestamp(1999,12,31,23,0): df.index = df.index.tz_localize("UTC") elif df.index.max() < pd.Timestamp(2000,1,1,1,0): df.index = df.index.tz_localize("Etc/GMT-1").tz_convert("UTC") else: raise ValueError("The timezone could not get defined for the given import." + str(df.head())) # delete measurements outside of the 10 minutes frequency df = df[df.index.minute%10==0].copy() # check if duplicates and try to remove if df.index.has_duplicates: df = df.reset_index()\ .groupby(self._cdc_col_names_imp + [self._cdc_date_col])\ .first()\ .reset_index().set_index(self._cdc_date_col) if df.index.has_duplicates: raise ValueError("There are duplicates in the DWD data that couldn't get removed.") # set frequency to 10 minutes df = df.asfreq("10min") # delete measurements below 0 n_col = self._cdc_col_names_imp[self._db_col_names_imp.index("raw")] df.loc[df[n_col]<0, n_col] = np.nan return df @cached_property def _table(self): return sa.table( f"{self.id}_{self._para}", sa.column("timestamp", sa.DateTime), sa.column("raw", sa.Integer), sa.column("qc", sa.Integer), sa.column("filled", sa.Integer), sa.column("filled_by", sa.SmallInteger), sa.column("corr", sa.Integer), schema="timeseries") @db_engine.deco_create_privilege def _create_timeseries_table(self): """Create the timeseries table in the DB if it is not yet existing.""" sql_add_table = f''' CREATE TABLE IF NOT EXISTS timeseries."{self.id}_{self._para}" ( timestamp timestamp PRIMARY KEY, raw int4, qn smallint, qc int4, filled int4, filled_by int2, corr int4 ); ''' with db_engine.connect() as con: con.execute(sqltxt(sql_add_table)) con.commit() @staticmethod def _check_period_extra(period): """Additional checks on period used in StationBase class _check_period method.""" # add time to period if given as date return period.expand_to_timestamp() @staticmethod def _richter_class_from_horizon(horizon): richter_class = None for key in RICHTER_CLASSES: if (horizon >= RICHTER_CLASSES[key]["min_horizon"]) and \ (horizon < RICHTER_CLASSES[key]["max_horizon"]): richter_class = key return richter_class
[docs] @db_engine.deco_update_privilege def update_horizon(self, skip_if_exist=True, **kwargs): """Update the horizon angle (Horizontabschirmung) in the meta table. Get new values from the raster and put in the table. Parameters ---------- skip_if_exist : bool, optional Skip updating the value if there is already a value in the meta table. The default is True. **kwargs : dict, optional Additional keyword arguments catch all, but unused here. Returns ------- float The horizon angle in degrees (Horizontabschirmung). """ if skip_if_exist: horizon = self.get_horizon() if horizon is not None: return horizon # check if files are available dem_files = [Path(file) for file in config.get_list("data:rasters", "dems")] for dem_file in dem_files: if not dem_file.is_file(): raise ValueError( f"The DEM file was not found in the data directory under: \n{dem_file}") # get the horizon configurations radius = int(config.get("weatherdb", "horizon_radius")) proj_crs = pyproj.CRS.from_epsg(config.get("weatherdb", "horizon_crs")) if not proj_crs.is_projected: raise ValueError( "The CRS for the horizon calculation is not projected. " + "Please define a projected CRS in the config file under weatherdb.horizon_crs.") # get raster basic information dem_infos = {} for dem_file in dem_files: with rio.open(dem_file) as dem: # get station position stat_h, stat_tr = rasterio.mask.mask( dem, [self.get_geom(crs=dem.crs)], crop=True, all_touched=False, nodata=np.nan) stat_geom = raster2points(stat_h, stat_tr, crs=dem.crs)\ .to_crs(proj_crs)\ .iloc[0].geometry xy = [stat_geom.x, stat_geom.y] stat_h = stat_h.flatten()[0] # get CRS transformer tr_to_dem = pyproj.Transformer.from_crs(proj_crs, dem.crs, always_xy=True) tr_from_dem = pyproj.Transformer.from_crs(dem.crs, proj_crs, always_xy=True) # get typical distance of raster cell points dem_tr = dem.transform p_top_left = (dem_tr.c, dem_tr.f) p_top_right = (dem_tr.c+dem_tr.a*dem.profile["width"], dem_tr.f) p_bottom_left= (dem_tr.c, dem_tr.f+dem_tr.e*dem.profile["height"]) p_bottom_right = (p_top_right[0], p_bottom_left[1]) dists = [] for p in [p_top_left, p_top_right, p_bottom_left, p_bottom_right]: p1 = Point(tr_from_dem.transform(*p)) dists.append(distance( p1, Point(tr_from_dem.transform(p[0] + dem_tr.a, p[1] + dem_tr.e)) )) dem_infos[dem_file] = dict( stat_geom=stat_geom, xy=xy, stat_h=stat_h, tr_to_dem=tr_to_dem, tr_from_dem=tr_from_dem, max_dist = max(dists)*1.2 ) # check if first dem has hight information if np.isnan(dem_infos[dem_files[0]]["stat_h"]): raise ValueError( f"update_horizon(): No height was found in the first dem for {self._para_long} Station {self.id}. " + "Therefor the horizon angle could not get updated.") # get station center point stat_geom = self.get_geom(crs=proj_crs) xy = [stat_geom.x, stat_geom.y] # compute the horizon angle for each western angle hab = pd.Series( index=pd.Index([], name="angle", dtype=int), name="horizon", dtype=float) raise_hole_error = False for angle in range(90, 271, 3): missing_lines = [polar_line(xy, radius, angle)] dem_lines = None for dem_file, dem_info in dem_infos.items(): with rio.open(dem_file) as dem: missing_lines_next = [] for line in missing_lines: # get raster points line_dem = shp_transform( dem_info["tr_to_dem"].transform, line) dem_np, dem_tr = rasterio.mask.mask( dem, [line_dem], crop=True, all_touched=True, nodata=np.nan) dem_line = raster2points(dem_np, dem_tr, crs=dem.crs).to_crs(proj_crs) # calculate the distance to the stations raster cell dem_line["dist"] = dem_line.distance(dem_info["stat_geom"]) dem_line.drop( dem_line.loc[dem_line["dist"]==0].index, inplace=True) dem_line = dem_line.sort_values("dist").reset_index(drop=True) # calculate the horizon angle dem_line["horizon"] = np.degrees(np.arctan( (dem_line["data"]-dem_info["stat_h"]) / dem_line["dist"])) # check if parts are missing and fill ##################################### # look for holes inside the line for i in dem_line[dem_line["dist"].diff() > dem_info["max_dist"]].index: missing_lines_next.append( polar_line(dem_line.loc[i-1, "geometry"].coords[0], dem_line.loc[i, "dist"] - dem_line.loc[i-1, "dist"], angle)) # merge the parts dem_lines = pd.concat( [dem_lines, dem_line], ignore_index=True) # look for missing values at the end dem_max_dist = dem_lines.iloc[-1]["dist"] if dem_max_dist < (radius - dem_info["max_dist"]): missing_lines_next.append( polar_line(dem_lines.iloc[-1]["geometry"].coords[0], radius - dem_max_dist, angle)) # check if no parts are missing if (len(missing_lines_next) == 0): break elif dem_file == dem_files[-1]: raise_hole_error = True else: # create lines for next iteration missing_lines = missing_lines_next hab[angle] = dem_lines["horizon"].max() if raise_hole_error: log.warning( f"Station{self._para}({self.id}).update_horizon(): There were holes in the DEM rasters providen when calculating the horizon angle. Therefor the calculated horizon angle could be faulty, but doesn't have to be, if the station is close to the sea for example.") # calculate the mean "horizontabschimung" # Richter: H’=0,15H(S-SW) +0,35H(SW-W) +0,35H(W-NW) +0, 15H(NW-N) horizon = max(0, 0.15*hab[(hab.index>225) & (hab.index<=270)].mean() + 0.35*hab[(hab.index>=180) & (hab.index<=225)].mean() + 0.35*hab[(hab.index>=135) & (hab.index<180)].mean() + 0.15*hab[(hab.index>=90) & (hab.index<135)].mean()) # insert to meta table in db self._update_meta( cols=["horizon"], values=[horizon]) return horizon
[docs] @db_engine.deco_update_privilege def update_richter_class(self, skip_if_exist=True, **kwargs): """Update the richter class in the meta table. Get new values from the raster and put in the table. Parameters ---------- skip_if_exist : bool, optional Skip updating the value if there is already a value in the meta table. The default is True **kwargs : dict, optional Additional keyword arguments catch all, but unused here. Returns ------- str The richter class name. """ # check if already value in table if skip_if_exist: richter_class = self.get_richter_class() if self.get_richter_class() is not None: return richter_class # get the richter class richter_class = self._richter_class_from_horizon( horizon=self.update_horizon(skip_if_exist=skip_if_exist)) # save to db self._update_meta( cols=["richter_class"], values=[richter_class]) return richter_class
[docs] @db_engine.deco_update_privilege def richter_correct(self, period=(None, None), **kwargs): """Do the richter correction on the filled data for the given period. Parameters ---------- period : TimestampPeriod or (tuple or list of datetime.datetime or None), optional The minimum and maximum Timestamp for which to get the timeseries. If None is given, the maximum or minimal possible Timestamp is taken. The default is (None, None). **kwargs : dict, optional Additional keyword arguments catch all, but unused here. Raises ------ Exception If no richter class was found for this station. """ # check if period is given if not isinstance(period, TimestampPeriod): period = TimestampPeriod(*period) period_in = period.copy() period = self._check_period( period=period, kinds=["filled"]) if not period_in.is_empty(): sql_period_clause = \ "WHERE timestamp BETWEEN {min_tstp} AND {max_tstp}".format( **period.get_sql_format_dict(f"'{self._tstp_format_db}'")) else: sql_period_clause = "" # check if temperature station is filled stat_t = StationT(self.id) stat_t_period = stat_t.get_filled_period(kind="filled") delta = timedelta(hours=5, minutes=50) min_date = config.get_date("weatherdb", "min_date") stat_t_min = stat_t_period[0].date() stat_t_max = stat_t_period[1].date() stat_p_min = (period[0] - delta).date() stat_p_max = (period[1] - delta).date() if stat_t_period.is_empty()\ or (stat_t_min > stat_p_min and not (stat_p_min < min_date) and (stat_t_min == min_date)) \ or (stat_t_max < stat_p_max)\ and not stat_t.is_last_imp_done(kind="filled"): stat_t.fillup(period=period) # get the richter exposition class richter_class = self.update_richter_class(skip_if_exist=True) if richter_class is None: raise Exception(f"No richter class was found for the precipitation station {self.id} and therefor no richter correction was possible.") # create the sql queries sql_format_dict = dict( stid=self.id, para=self._para, richter_class=richter_class, period_clause=sql_period_clause, n_decim=self._decimals, t_decim=stat_t._decimals ) # daily precipitation sql_p_daily = """ SELECT timestamp::date AS date, sum("filled") AS "filled", count(*) FILTER (WHERE "filled" > 0) AS "count_n" FROM timeseries."{stid}_{para}" {period_clause} GROUP BY timestamp::date """.format(**sql_format_dict) # add is_winter sql_p_daily_winter = f""" SELECT date, CASE WHEN EXTRACT(MONTH FROM date) IN (1, 2, 3, 10, 11, 12) THEN true::bool ELSE false::bool END AS is_winter, "filled" AS "p_d", "count_n" FROM ({sql_p_daily}) tsp_d """ # add precipitation class sql_p_daily_precip_class = """ SELECT date, "count_n", "p_d", CASE WHEN (tst."filled" >= (3 * {t_decim}) AND "is_winter") THEN 'precip_winter' WHEN (tst."filled" >= (3 * {t_decim}) AND NOT "is_winter") THEN 'precip_summer' WHEN (tst."filled" <= (-0.7 * {t_decim})::int) THEN 'snow' WHEN (tst."filled" IS NULL) THEN NULL ELSE 'mix' END AS precipitation_typ FROM ({sql_p_daily_winter}) tsp_d_wi LEFT JOIN timeseries."{stid}_t" tst ON tst.timestamp=tsp_d_wi.date """.format( sql_p_daily_winter=sql_p_daily_winter, **sql_format_dict ) # calculate the delta n sql_delta_n = """ SELECT date, CASE WHEN "count_n"> 0 THEN round(("b_{richter_class}" * ("p_d"::float/{n_decim})^"e" * {n_decim})/"count_n")::int ELSE 0 END AS "delta_10min" FROM ({sql_p_daily_precip_class}) tsp_d2 LEFT JOIN richter_parameters r ON r."precipitation_typ"=tsp_d2."precipitation_typ" """.format( sql_p_daily_precip_class=sql_p_daily_precip_class, **sql_format_dict ) # calculate the new corr sql_new_corr = """ SELECT timestamp, CASE WHEN "filled" > 0 THEN ts."filled" + ts_delta_n."delta_10min" ELSE ts."filled" END as corr FROM timeseries."{stid}_{para}" ts LEFT JOIN ({sql_delta_n}) ts_delta_n ON (ts.timestamp)::date = ts_delta_n.date {period_clause} """.format( sql_delta_n=sql_delta_n, **sql_format_dict ) # update the timeseries sql_update = """ UPDATE timeseries."{stid}_{para}" ts SET "corr" = new.corr FROM ({sql_new_corr}) new WHERE ts.timestamp = new.timestamp AND ts.corr is distinct from new.corr; """.format( sql_new_corr=sql_new_corr, **sql_format_dict ) # run commands if "return_sql" in kwargs and kwargs["return_sql"]: return sql_update self._execute_long_sql( sql_update, description="richter corrected for the period {min_tstp} - {max_tstp}".format( **period.get_sql_format_dict(format=self._tstp_format_human) )) # mark last import as done, if previous are ok if (self.is_last_imp_done(kind="qc") and self.is_last_imp_done(kind="filled")): if (period_in.is_empty() or period_in.contains(self.get_last_imp_period())): self._mark_last_imp_done(kind="corr") # update filled time in meta table self.update_period_meta(kind="corr") # update multi annual mean self.update_ma_timeseries(kind="corr")
[docs] @db_engine.deco_update_privilege def corr(self, *args, **kwargs): return self.richter_correct(*args, **kwargs)
[docs] @db_engine.deco_update_privilege def last_imp_richter_correct(self, _last_imp_period=None, **kwargs): """Do the richter correction of the last import. Parameters ---------- _last_imp_period : weatherdb.utils.TimestampPeriod, optional Give the overall period of the last import. This is only for intern use of the stationsP method to not compute over and over again the period. The default is None. **kwargs : dict, optional Additional keyword arguments passed to the richter_correct method. """ if not self.is_last_imp_done(kind="corr"): if _last_imp_period is None: period = self.get_last_imp_period(all=True) else: period = _last_imp_period self.richter_correct( period=period, **kwargs ) else: log.info(f"The last import of {self._para_long} Station {self.id} was already richter corrected and is therefor skiped")
[docs] @db_engine.deco_update_privilege def last_imp_corr(self, _last_imp_period=None, **kwargs): """A wrapper for last_imp_richter_correct().""" return self.last_imp_richter_correct(_last_imp_period=_last_imp_period, **kwargs)
@db_engine.deco_update_privilege def _sql_fillup_extra_dict(self, **kwargs): fillup_extra_dict = super()._sql_fillup_extra_dict(**kwargs) stat_pd = StationPD(self.id) if stat_pd.isin_db() and \ not stat_pd.get_filled_period(kind="filled", from_meta=True).is_empty(): # adjust 10 minutes sum to match measured daily value, # but don't add more than 10mm/10min and don't create single peaks with more than 5mm/min sql_extra = """ UPDATE new_filled_{stid}_{para} ts SET filled = tsnew.filled FROM ( SELECT ts.timestamp, CASE WHEN tsb.filled = 0 AND tsa.filled = 0 THEN LEAST(ts.filled * coef, 5*{decim}) ELSE CASE WHEN ((ts.filled * coef) - ts.filled) <= (10 * {decim}) THEN LEAST(ts.filled * coef, 50*{decim}) ELSE LEAST(ts.filled + (10 * {decim}), 50*{decim}) END END as filled FROM new_filled_{stid}_{para} ts INNER JOIN ( SELECT date, ts_d."raw"/ts_10."filled"::float AS coef FROM ( SELECT date(timestamp - '5h 50min'::INTERVAL), sum(filled) AS filled FROM new_filled_{stid}_{para} GROUP BY date(timestamp - '5h 50min'::INTERVAL) ) ts_10 LEFT JOIN timeseries."{stid}_p_d" ts_d ON ts_10.date=ts_d.timestamp WHERE ts_d."raw" IS NOT NULL AND ts_10.filled > 0 ) df_coef ON (ts.timestamp - '5h 50min'::INTERVAL)::date = df_coef.date AND coef != 1 LEFT JOIN timeseries."{stid}_{para}" tsb ON ts.timestamp = tsb.timestamp - INTERVAL '10 min' LEFT JOIN timeseries."{stid}_{para}" tsa ON ts.timestamp = tsa.timestamp + INTERVAL '10 min' WHERE ts.filled IS NOT NULL ) tsnew WHERE tsnew.timestamp = ts.timestamp; """.format(stid=self.id, para=self._para, decim=self._decimals) fillup_extra_dict.update(dict(sql_extra_after_loop=sql_extra)) else: log.warning(f"StationP({self.id}).fillup: There is no daily timeserie in the database, "+ "therefor the 10 minutes values are not getting adjusted to daily values") return fillup_extra_dict
[docs] def get_corr(self, **kwargs): return self.get_df(kinds=["corr"], **kwargs)
[docs] def get_qn(self, **kwargs): return self.get_df(kinds=["qn"], **kwargs)
[docs] def get_richter_class(self, update_if_fails=True): """Get the richter class for this station. Provide the data from the meta table. Parameters ---------- update_if_fails: bool, optional Should the richter class get updatet if no exposition class is found in the meta table? If False and no exposition class was found None is returned. The default is True. Returns ------- string The corresponding richter exposition class. """ sql = f""" SELECT richter_class FROM meta_{self._para} WHERE station_id = {self.id} """ with db_engine.connect() as con: res = con.execute(sqltxt(sql)).first() # check result if res is None: if update_if_fails: if db_engine.is_superuser: self.update_richter_class() # update_if_fails is False to not get an endless loop return self.get_richter_class(update_if_fails=False) else: warnings.warn("You don't have the permissions to change something on the database.\nTherefor an update of the richter_class is not possible.") return None else: return None else: return res[0]
[docs] def get_horizon(self): """Get the value for the horizon angle. (Horizontabschirmung) This value is defined by Richter (1995) as the mean horizon angle in the west direction as: H’=0,15H(S-SW) +0,35H(SW-W) +0,35H(W-NW) +0, 15H(NW-N) Returns ------- float or None The mean western horizon angle """ return self.get_meta(infos="horizon")