Source code for weatherdb.db.connections

# libraries
import sqlalchemy
from sqlalchemy import text as sqltxt
from sqlalchemy import URL
import functools

from ..config import config

# DB connection
###############
[docs] class DBEngine: def __init__(self): self._reset_engine() # add the listeners for configuration changes self._config_listener_connection = ( "database", "connection", self._connection_update) self._config_listener_subsection = ( f"database:{config.get('database', 'connection')}", None, self._reset_engine) config.add_listener(*self._config_listener_connection) config.add_listener(*self._config_listener_subsection) def __del__(self): config.remove_listener(*self._config_listener_connection) config.remove_listener(*self._config_listener_subsection) def _connection_update(self): """Listener if another database subsection is selected.""" # first remove the old section listener config.remove_listener(*self._config_listener_subsection) # add the new section listener self._config_listener_subsection = ( f"database:{config.get('database', 'connection')}", None, self._reset_engine) config.add_listener(*self._config_listener_subsection) # create a new engine self.create_engine() # Engine and Session # ################## def _reset_engine(self): """Reset the engine to None to force a recreation on next usage.""" self._engine = None self._session = None self._is_superuser = None self._select_privilege = None self._create_privilege = None self._update_privilege = None self._insert_privilege = None self._delete_privilege = None self._priviledge_to_recheck = set()
[docs] def connect(self, *args, **kwargs): return self.get_engine().connect(*args, **kwargs)
[docs] def get_engine(self): """Get the sqlalchemy database engine. Returns the last created engine if possible or creates a new one. Returns ------- sqlalchemy.engine.base.Engine _description_ """ if self._engine is None: self.create_engine() return self._engine
[docs] def create_engine(self): # create the engine con_key = config.get("database", "connection") con_sect_key = f"database:{con_key}" con_section = config[con_sect_key] user, pwd = config.get_db_credentials() self._engine = sqlalchemy.create_engine( URL.create( drivername="postgresql+psycopg2", username=user, password=pwd, host=con_section["HOST"], database=con_section["DATABASE"], port=con_section["PORT"]), connect_args={ "options": "-c timezone=utc", 'connect_timeout': 10}) self._check_is_superuser() return self._engine
@property def engine(self): return self.get_engine() @engine.setter def engine(self, value): raise PermissionError("You are not allowed to change the engine of the database connection.\nPlease change the configuration to change the database connection.")
[docs] def create_session(self): """Create a new session from the engine. Returns ------- sqlalchemy.orm.session.Session _description_ """ self._session = sqlalchemy.orm.sessionmaker(bind=self.get_engine()) return self._session
[docs] def get_session(self): """Get a session from the engine. Returns ------- sqlalchemy.orm.session.Session _description_ """ if self._session is None: return self.create_session() else: return self._session
@property def session(self): return self.get_session() @session.setter def session(self, value): raise PermissionError("You are not allowed to change the session of the database connection.\nPlease change the configuration to change the database connection.")
[docs] def reload_config(self): """Reload the configuration and create a new engine. """ self.create_engine()
# Privilege checks ################## def _check_is_superuser(self): with self.connect() as con: self._is_superuser = con.execute(sqltxt( "SELECT usesuper FROM pg_user WHERE usename = current_user;" )).first()[0] return self._is_superuser @property def is_superuser(self): if self._is_superuser is None: self._check_is_superuser() return self._is_superuser @is_superuser.setter def is_superuser(self, value): raise PermissionError("You are not allowed to change the superuser status of the database connection, as this is due to PostGreSQL database privileges of your database user.") def _check_privilege(self, privilege): if self._is_superuser: return True else: with self.connect() as con: if privilege == "CREATE": return con.execute(sqltxt( f"SELECT pg_catalog.has_schema_privilege('public', '{privilege}');" )).first()[0] res = con.execute(sqltxt( f"""SELECT bool_and(pg_catalog.has_table_privilege(table_name, '{privilege}')) FROM information_schema.TABLES WHERE table_schema=CURRENT_SCHEMA AND table_type='BASE TABLE';""" )).first()[0] if res is None: if privilege in self._priviledge_to_recheck: self.log.debug(f"Privilege {privilege} not found, probably because database isn't initiated. Setting as True.") else: self._priviledge_to_recheck.add(privilege) return True else: if privilege in self._priviledge_to_recheck: self._priviledge_to_recheck.remove(privilege) return res def _check_select_privilege(self): """Check on the database if the user has the SELECT privilege.""" self._select_privilege = self._check_privilege("SELECT") return self._select_privilege def _check_update_privilege(self): """Check on the database if the user has the UPDATE privilege.""" self._update_privilege = self._check_privilege("UPDATE") return self._update_privilege def _check_insert_privilege(self): """Check on the database if the user has the INSERT privilege.""" self._insert_privilege = self._check_privilege("INSERT") return self._insert_privilege def _check_create_privilege(self): """Check on the database if the user has the CREATE privilege.""" self._create_privilege = self._check_privilege("CREATE") return self._create_privilege def _check_delete_privilege(self): """Check on the database if the user has the DELETE privilege.""" self._delete_privilege = self._check_privilege("DELETE") return self._delete_privilege @property def select_privilege(self): """Does the user have the PostGreSQL SELECT privilege on the database?""" if self._select_privilege is None or "SELECT" in self._priviledge_to_recheck: self._check_select_privilege() return self._select_privilege @property def update_privilege(self): """Does the user have the PostGreSQL UPDATE privilege on the database?""" if self._update_privilege is None or "UPDATE" in self._priviledge_to_recheck: self._check_update_privilege() return self._update_privilege @property def insert_privilege(self): """Does the user have the PostGreSQL INSERT privilege on the database?""" if self._insert_privilege is None or "INSERT" in self._priviledge_to_recheck: self._check_insert_privilege() return self._insert_privilege @property def upsert_privilege(self): """Does the user have the PostGreSQL INSERT and UPDATE privilege on the database?""" return self.insert_privilege and self.update_privilege @property def create_privilege(self): """Does the user have the PostGreSQL CREATE privilege on the database?""" if self._create_privilege is None or "CREATE" in self._priviledge_to_recheck: self._check_create_privilege() return self._create_privilege @property def delete_privilege(self): """Does the user have the PostGreSQL DELETE privilege on the database?""" if self._delete_privilege is None or "DELETE" in self._priviledge_to_recheck: self._check_delete_privilege() return self._delete_privilege @property def all_privileges(self): """Does the user have all (SELECT, UPDATE, INSERT, DELETE, CREATE) PostGreSQL privileges on the database?""" return self.select_privilege and \ self.update_privilege and \ self.insert_privilege and \ self.create_privilege and \ self.delete_privilege def _privilege_setters(self, property_name): raise PermissionError(f"You are not allowed to change the value of {property_name} as this is due to PostGreSQL database privileges of your user.") @select_privilege.setter def select_privilege(self, value): self._privilege_setters("select_privilege") @update_privilege.setter def update_privilege(self, value): self._privilege_setters("update_privilege") @insert_privilege.setter def insert_privilege(self, value): self._privilege_setters("insert_privilege") @upsert_privilege.setter def upsert_privilege(self, value): self._privilege_setters("upsert_privilege") @create_privilege.setter def create_privilege(self, value): self._privilege_setters("create_privilege") @delete_privilege.setter def delete_privilege(self, value): self._privilege_setters("delete_privilege") @all_privileges.setter def all_privileges(self, value): self._privilege_setters("all_privileges")
[docs] def deco_is_superuser(self, target): """Decorator to check if the user is a superuser.""" @functools.wraps(target) def wrapper(*args, **kwargs): if self.is_superuser: return target(*args, **kwargs) else: raise PermissionError("You are no super user of the Database and therefor this function is not available.") return wrapper
[docs] def deco_select_privilege(self, target): """Decorator to check if the user has the SELECT privilege.""" @functools.wraps(target) def wrapper(*args, **kwargs): if self.select_privilege: return target(*args, **kwargs) else: raise PermissionError("You have no select privilege on the database and therefor this function is not available.") return wrapper
[docs] def deco_update_privilege(self, target): """Decorator to check if the user has the UPDATE privilege.""" @functools.wraps(target) def wrapper(*args, **kwargs): if self.update_privilege: return target(*args, **kwargs) else: raise PermissionError("You have no update privilege on the database and therefor this function is not available.") return wrapper
[docs] def deco_insert_privilege(self, target): """Decorator to check if the user has the INSERT privilege.""" @functools.wraps(target) def wrapper(*args, **kwargs): if self.insert_privilege: return target(*args, **kwargs) else: raise PermissionError("You have no insert privilege on the database and therefor this function is not available.") return wrapper
[docs] def deco_upsert_privilege(self, target): """Decorator to check if the user has the INSERT and UPDATE privilege.""" @functools.wraps(target) def wrapper(*args, **kwargs): if self.upsert_privilege: return target(*args, **kwargs) else: raise PermissionError("You have no upsert privilege on the database and therefor this function is not available.") return wrapper
[docs] def deco_create_privilege(self, target): """Decorator to check if the user has the CREATE privilege.""" @functools.wraps(target) def wrapper(*args, **kwargs): if self.create_privilege: return target(*args, **kwargs) else: raise PermissionError("You are no admin user of the Database and therefor this function is not available.") return wrapper
[docs] def deco_delete_privilege(self, target): """Decorator to check if the user has the DELETE privilege.""" @functools.wraps(target) def wrapper(*args, **kwargs): if self.delete_privilege: return target(*args, **kwargs) else: raise PermissionError("You have no delete privilege on the database and therefor this function is not available.") return wrapper
[docs] def deco_all_privileges(self, target): """Decorator to check if the user has all (SELECT, UPDATE, INSERT, DELETE, CREATE) privileges.""" @functools.wraps(target) def wrapper(*args, **kwargs): if self.all_privileges: return target(*args, **kwargs) else: raise PermissionError("You are no super user of the Database and therefor this function is not available.") return wrapper
db_engine = DBEngine()