import sqlalchemy as sa
from sqlalchemy.sql import func
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.orm import DeclarativeBase
from typing_extensions import Annotated
from datetime import timedelta, timezone
from typing import Optional
from geoalchemy2 import Geometry
__all__ = [
"MetaP",
"MetaPD",
"MetaET",
"MetaT",
"RawFiles",
"DroppedStations",
"ParameterVariables",
"RichterValues",
"StationMATimeserie",
"StationMARaster",
"NeededDownloadTime",
"Settings"
]
# define custom types
sint = Annotated[int, 2]
str3 = Annotated[str, 3]
str4 = Annotated[str, 4]
str7 = Annotated[str, 7]
str30 = Annotated[str, 30]
str50 = Annotated[str, 50]
class UTCDateTime(sa.types.TypeDecorator):
impl = sa.types.DateTime
cache_ok = True
def process_bind_param(self, value, engine):
if value is None:
return
if value.utcoffset() is None:
raise ValueError(
'Got naive datetime while timezone-aware is expected'
)
return value.astimezone(timezone.utc)
def process_result_value(self, value, engine):
if value is not None:
return value.replace(tzinfo=timezone.utc)
# define base class for all database tables
[docs]
class ModelBase(DeclarativeBase):
registry = sa.orm.registry(
type_annotation_map={
sint: sa.SmallInteger(),
UTCDateTime: UTCDateTime(),
float: sa.Float(),
str3: sa.VARCHAR(3),
str4: sa.VARCHAR(4),
str7: sa.VARCHAR(7),
str30: sa.VARCHAR(30),
str50: sa.VARCHAR(50),
}
)
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
if hasattr(cls, "load_fixtures"):
sa.event.listen(cls.metadata, 'after_create', cls.load_fixtures)
# define database models
# ----------------------
# declare all database tables
# ---------------------------
[docs]
class RawFiles(ModelBase):
__tablename__ = 'raw_files'
__table_args__ = dict(
schema='public',
comment="The files that got imported from the CDC Server.")
parameter: Mapped[str3] = mapped_column(
primary_key=True,
comment="The parameter that got downloaded for this file. e.g. t, et, p_d, p",
sort_order=-10)
filepath: Mapped[str] = mapped_column(
primary_key=True,
comment="The filepath on the CDC Server",
sort_order=-8)
modtime: Mapped[UTCDateTime] = mapped_column(
comment="The modification time on the CDC Server of the coresponding file",
sort_order=-5)
[docs]
class DroppedStations(ModelBase):
__tablename__ = 'dropped_stations'
__table_args__ = dict(
schema='public',
comment="This table is there to save the station ids that got dropped, so they wont GET recreated")
station_id: Mapped[int] = mapped_column(
primary_key=True,
comment="The station id that got dropped",
sort_order=-10)
parameter: Mapped[str3] = mapped_column(
primary_key=True,
comment="The parameter (p,t,et,p_d) of the station that got dropped",
sort_order=-9)
why: Mapped[str] = mapped_column(
sa.Text(),
comment="The reason why the station got dropped",
sort_order=-8)
timestamp: Mapped[UTCDateTime] = mapped_column(
server_default=func.now(),
comment="The timestamp when the station got dropped",
sort_order=-7)
[docs]
class ParameterVariables(ModelBase):
__tablename__ = 'parameter_variables'
__table_args__ = dict(
schema='public',
comment="This table is there to save specific variables that are nescesary for the functioning of the scripts")
parameter: Mapped[str3] = mapped_column(
primary_key=True,
comment="The parameter for which the variables are valid. e.g. p/p_d/t/et.",
sort_order=-10)
start_tstp_last_imp: Mapped[Optional[UTCDateTime]] = mapped_column(
comment="At what timestamp did the last complete import start. This is then the maximum timestamp for which to expand the timeseries to.",
sort_order=-9)
max_tstp_last_imp: Mapped[Optional[UTCDateTime]] = mapped_column(
comment="The maximal timestamp of the last imports raw data of all the timeseries",
sort_order=-8)
class RichterParameters(ModelBase):
__tablename__ = 'richter_parameters'
__table_args__ = dict(
schema='public',
comment="The Richter parameter values for the equation.")
precipitation_typ: Mapped[str] = mapped_column(
sa.Text(),
primary_key=True,
comment="The type of precipitation. e.g. 'Schnee', 'Regen', ...",
sort_order=-10)
e: Mapped[float] = mapped_column(
comment="The e-value of the equation.",
sort_order=-8)
b_no_protection: Mapped[float] = mapped_column(
name="b_no-protection",
comment="The b-value of the equation for exposition class 'no protection'.",
sort_order=-5)
b_little_protection: Mapped[float] = mapped_column(
name="b_little-protection",
comment="The b-value of the equation for exposition class 'little protection'.",
sort_order=-4)
b_protected: Mapped[float] = mapped_column(
comment="The b-value of the equation for exposition class 'protected'.",
sort_order=-3)
b_heavy_protection: Mapped[float] = mapped_column(
name="b_heavy-protection",
comment="The b-value of the equation for exposition class 'heavy protection'.",
sort_order=-2)
@classmethod
def load_fixtures(cls, target, connection, *args, **kwargs):
from pathlib import Path
from json import load
fix_dir = Path(__file__).parent.joinpath("fixtures")
with open(fix_dir/"RichterParameters.json") as f:
data = load(f)
if connection.execute(sa.sql.select(sa.func.count("*")).select_from(cls.__table__)).scalar() == 0:
connection.execute(cls.__table__.insert(), data)
connection.commit()
# sa.event.listen(RichterParameters.__table__, 'after_create', RichterParameters.load_fixtures)
[docs]
class StationMATimeserie(ModelBase):
__tablename__ = 'station_ma_timeserie'
__table_args__ = dict(
schema='public',
comment="The multi annual mean values of the stations timeseries for the maximum available timespan.")
station_id: Mapped[int] = mapped_column(
primary_key=True,
comment="The DWD-ID of the station.",
sort_order=-10)
parameter: Mapped[str3] = mapped_column(
primary_key=True,
comment="The parameter of the station. e.g. 'P', 'T', 'ET'",
sort_order=-9)
kind: Mapped[str] = mapped_column(
primary_key=True,
comment="The kind of the timeserie. e.g. 'raw', 'filled', 'corr'",
sort_order=-8)
value: Mapped[int] = mapped_column(
comment="The multi annual value of the yearly mean value of the station to the multi annual mean value of the raster.",
sort_order=1)
[docs]
class StationMARaster(ModelBase):
__tablename__ = 'station_ma_raster'
__table_args__ = dict(
schema='public',
comment="The multi annual climate raster values for each station.")
station_id: Mapped[int] = mapped_column(
primary_key=True,
comment="The DWD-ID of the station.",
sort_order=-10)
raster_key: Mapped[str7] = mapped_column(
primary_key=True,
comment="The name of the raster. e.g. 'dwd' or 'hyras'",
sort_order=-9)
parameter: Mapped[str3] = mapped_column(
primary_key=True,
comment="The parameter of the raster. e.g. 'p', 't', 'et'",
sort_order=-8)
term: Mapped[str4] = mapped_column(
primary_key=True,
comment="The term of the raster. e.g. 'year', 'wihy', 'suhy'",
sort_order=-7)
value: Mapped[int] = mapped_column(
comment="The value of the raster for the station in the database unit.",
sort_order=1)
distance: Mapped[int] = mapped_column(
comment="The distance of the station to the raster value in meters.",
sort_order=2)
[docs]
class NeededDownloadTime(ModelBase):
__tablename__ = 'needed_download_time'
__table_args__ = dict(
schema='public',
comment="Saves the time needed to save the timeseries. This helps predicting download time")
timestamp: Mapped[UTCDateTime] = mapped_column(
server_default=func.now(),
primary_key=True,
comment="The timestamp when the download hapend.",
sort_order=-5)
quantity: Mapped[int] = mapped_column(
comment="The number of stations that got downloaded",
sort_order=1)
aggregate: Mapped[str] = mapped_column(
comment="The chosen aggregation. e.g. hourly, 10min, daily, ...",
sort_order=2)
timespan: Mapped[timedelta] = mapped_column(
sa.Interval(),
comment="The timespan of the downloaded timeseries. e.g. 2 years",
sort_order=3)
zip: Mapped[bool] = mapped_column(
comment="Was the download zipped?",
sort_order=4)
pc: Mapped[str] = mapped_column(
comment="The name of the pc that downloaded the timeseries.",
sort_order=5)
duration: Mapped[timedelta] = mapped_column(
sa.Interval(),
comment="The needed time to download and create the timeserie",
sort_order=6)
output_size: Mapped[int] = mapped_column(
comment="The size of the created output file in bytes",
sort_order=7)
[docs]
class Settings(ModelBase):
__tablename__ = 'settings'
__table_args__ = dict(
schema='public',
comment="This table saves settings values for the script-databse connection. E.G. the latest package version that updated the database.")
key: Mapped[str] = mapped_column(
sa.String(20),
primary_key=True,
comment="The key of the setting",
sort_order=0)
value: Mapped[str] = mapped_column(
sa.String(60),
comment="The value of the setting",
sort_order=1)