"""
This submodule has only one class Broker. This one is used to do actions on all the stations together. Mainly only used for updating the DB.
"""
# libraries
import logging
from sqlalchemy import text as sqltxt
from .lib.connections import DB_ENG
from .stations import StationsN, StationsND, StationsT, StationsET
from packaging import version as pv
from . import __version__ as __version__
log = logging.getLogger(__name__)
if not DB_ENG.is_superuser:
raise PermissionError("You are no super user of the Database and therefor the Broker class is not available.")
[docs]class Broker(object):
"""A class to manage and update the database.
Can get used to update all the stations and parameters at once.
This class is only working with super user privileges.
"""
def __init__(self):
self.stations_nd = StationsND()
self.stations_t = StationsT()
self.stations_et = StationsET()
self.stations_n = StationsN()
self.stations = [
self.stations_nd,
self.stations_t,
self.stations_et,
self.stations_n]
def _check_paras(self, paras, valid_paras=["n_d", "n", "t", "et"]):
valid_paras = ["n_d", "n", "t", "et"]
for para in paras:
if para not in valid_paras:
raise ValueError(
"The given parameter {para} is not valid.".format(
para=para))
[docs] def update_raw(self, only_new=True, paras=["n_d", "n", "t", "et"]):
"""Update the raw data from the DWD-CDC server to the database.
Parameters
----------
only_new : bool, optional
Get only the files that are not yet in the database?
If False all the available files are loaded again.
The default is True.
paras : list of str, optional
The parameters for which to do the actions.
Can be one, some or all of ["n_d", "n", "t", "et"].
The default is ["n_d", "n", "t", "et"].
"""
log.info("="*79 + "\nBroker update_raw starts")
self._check_paras(paras)
for stations in self.stations:
if stations._para in paras:
stations.update_raw(only_new=only_new)
[docs] def update_ma(self, paras=["n_d", "n", "t", "et"]):
"""Update the multi-annual data from raster to table.
Parameters
----------
paras : list of str, optional
The parameters for which to do the actions.
Can be one, some or all of ["n_d", "n", "t", "et"].
The default is ["n_d", "n", "t", "et"].
"""
log.info("="*79 + "\nBroker update_ma starts")
self._check_paras(paras)
for stations in self.stations:
if stations._para in paras:
stations.update_ma()
[docs] def quality_check(self, paras=["n", "t", "et"], with_fillup_nd=True):
"""Do the quality check on the stations raw data.
Parameters
----------
paras : list of str, optional
The parameters for which to do the actions.
Can be one, some or all of ["n", "t", "et"].
The default is ["n", "t", "et"].
with_fillup_nd : bool, optional
Should the daily precipitation data get filled up if the 10 minute precipitation data gets quality checked.
The default is True.
"""
self._check_paras(paras=paras, valid_paras=["n", "t", "et"])
log.info("="*79 + "\nBroker quality_check starts")
if with_fillup_nd and "n" in paras:
self.stations_nd.fillup()
for stations in self.stations:
if stations._para in paras:
stations.quality_check()
[docs] def last_imp_quality_check(self, paras=["n", "t", "et"], with_fillup_nd=True):
"""Quality check the last imported data.
Also fills up the daily precipitation data if the 10 minute precipitation data should get quality checked.
Parameters
----------
paras : list of str, optional
The parameters for which to do the actions.
Can be one, some or all of ["n", "t", "et"].
The default is ["n", "t", "et"].
with_fillup_nd : bool, optional
Should the daily precipitation data get filled up if the 10 minute precipitation data gets quality checked.
The default is True.
"""
log.info("="*79 + "\nBroker last_imp_quality_check starts")
self._check_paras(
paras=paras,
valid_paras=["n", "t", "et"])
if with_fillup_nd and "n" in paras:
self.stations_nd.last_imp_fillup()
for stations in self.stations:
if stations._para in paras:
stations.last_imp_quality_check()
[docs] def fillup(self, paras=["n", "t", "et"]):
"""Fillup the timeseries.
Parameters
----------
paras : list of str, optional
The parameters for which to do the actions.
Can be one, some or all of ["n_d", "n", "t", "et"].
The default is ["n_d", "n", "t", "et"].
"""
log.info("="*79 + "\nBroker fillup starts")
self._check_paras(paras)
for stations in self.stations:
if stations._para in paras:
stations.fillup()
[docs] def last_imp_fillup(self, paras=["n", "t", "et"]):
"""Fillup the last imported data.
Parameters
----------
paras : list of str, optional
The parameters for which to do the actions.
Can be one, some or all of ["n_d", "n", "t", "et"].
The default is ["n_d", "n", "t", "et"].
"""
log.info("="*79 + "\nBroker last_imp_fillup starts")
self._check_paras(paras)
for stations in self.stations:
if stations._para in paras:
stations.last_imp_fillup()
[docs] def richter_correct(self):
"""Richter correct all of the precipitation data.
"""
log.info("="*79 + "\nBroker: last_imp_corr starts")
self.stations_n.richter_correct()
[docs] def last_imp_corr(self):
"""Richter correct the last imported precipitation data.
"""
log.info("="*79 + "\nBroker: last_imp_corr starts")
self.stations_n.last_imp_corr()
[docs] def update_db(self, paras=["n_d", "n", "t", "et"]):
"""The regular Update of the database.
Downloads new data.
Quality checks the newly imported data.
Fills up the newly imported data.
Parameters
----------
paras : list of str, optional
The parameters for which to do the actions.
Can be one, some or all of ["n_d", "n", "t", "et"].
The default is ["n_d", "n", "t", "et"].
"""
log.info("="*79 + "\nBroker update_db starts")
self._check_paras(paras)
self.check_is_broker_active()
if pv.parse(__version__) > self.get_db_version():
log.info("--> There is a new version of the python script. Therefor the database is recalculated completly")
self.initiate_db()
else:
self.update_meta(paras=paras)
self.update_raw(paras=paras)
if "n_d" in paras:
paras.remove("n_d")
self.last_imp_quality_check(paras=paras)
self.last_imp_fillup(paras=paras)
self.last_imp_corr()
self.set_is_broker_active(False)
[docs] def initiate_db(self):
"""Initiate the Database.
Downloads all the data from the CDC server for the first time.
Updates the multi-annual data and the richter-class for all the stations.
Quality checks and fills up the timeseries.
"""
log.info("="*79 + "\nBroker initiate_db starts")
self.check_is_broker_active()
self.update_meta(
paras=["n_d", "n", "t", "et"])
self.update_raw(
paras=["n_d", "n", "t", "et"],
only_new=False)
self.update_ma(
paras=["n_d", "n", "t", "et"])
self.stations_n.update_richter_class()
self.quality_check(paras=["n", "t", "et"])
self.fillup(paras=["n", "t", "et"])
self.richter_correct()
self.set_db_version()
self.set_is_broker_active(False)
[docs] def vacuum(self, do_analyze=True):
sql = "VACUUM {anlyze};".format(
analyze="ANALYZE" if do_analyze else "")
with DB_ENG.connect() as con:
con.execute(sqltxt(sql))
[docs] def get_setting(self, key):
"""Get a specific settings value.
Parameters
----------
key : str
The key of the setting.
Returns
-------
value: str
The version of the database.
"""
with DB_ENG.connect() as con:
res = con.execute(
sqltxt(f"SELECT value FROM settings WHERE key='{key}';")
).fetchone()
if res is None:
return None
else:
return res[0]
[docs] def set_setting(self, key:str, value:str):
"""Set a specific setting.
Parameters
----------
key : str
The key of the setting.
value : str
The value of the setting.
"""
with DB_ENG.connect() as con:
con.execute(sqltxt(
f"""INSERT INTO settings
VALUES ('{key}', '{value}')
ON CONFLICT (key)
DO UPDATE SET value=EXCLUDED.value;"""))
[docs] def get_db_version(self):
"""Get the package version that the databases state is at.
Returns
-------
version
The version of the database.
"""
res = self.get_setting("version")
if res is not None:
res = pv.parse(res)
return res
[docs] def set_db_version(self, version=pv.parse(__version__)):
"""Set the package version that the databases state is at.
Parameters
----------
version: pv.Version, optional
The Version of the python package
The default is the version of this package.
"""
if not isinstance(version, pv.Version):
raise TypeError("version must be of type pv.Version")
self.set_setting("version", str(version))
[docs] def set_is_broker_active(self, is_active:bool):
"""Set the state of the broker.
Parameters
----------
is_active : bool
Whether the broker is active.
"""
self.set_setting("is_broker_active", str(is_active))
[docs] def get_is_broker_active(self):
"""Get the state of the broker.
Returns
-------
bool
Whether the broker is active.
"""
return self.get_setting("is_broker_active") == "True"
[docs] def check_is_broker_active(self):
"""Check if another broker instance is active and if so raise an error.
Raises
------
RuntimeError
If the broker is not active.
"""
if self.get_is_broker_active():
raise RuntimeError("Another Broker is active and therefor this broker is not allowed to run.")
else:
self.set_is_broker_active(True)