Source code for weatherDB.utils.dwd
"""
Some utilities functions to get data from the DWD-CDC server.
Based on `max_fun` package on https://github.com/maxschmi/max_fun
Created by Max Schmit, 2021
"""
# libraries
import dateutil
import ftplib
import pathlib
import geopandas as gpd
import pandas as pd
from zipfile import ZipFile
import re
from io import BytesIO, StringIO
import traceback
import logging
import time
import random
# DWD - CDC FTP Server
CDC_HOST = "opendata.dwd.de"
# logger
log = logging.getLogger(__name__)
# basic functions
# ----------------
[docs]
def dwd_id_to_str(id):
"""
Convert a station id to normal DWD format as str.
Parameters
----------
id : int or str
The id of the station.
Returns
-------
str
string of normal DWD Station id.
"""
return f"{id:0>5}"
def _dwd_date_parser(date_ser):
"""
Parse the dates from a DWD table to datetime.
Parameters
----------
date_ser : pd.Series of str or str
the string from the DWD table. e.g. "20200101" or "2020010112"
Returns
-------
datetime.datetime
The date as datetime.
"""
if not isinstance(date_ser, pd.Series):
raise ValueError("date_str must be a pd.Series of str")
# test if list or single str
char_num = len(date_ser.iloc[0])
# parse to correct datetime
if char_num == 8:
return pd.to_datetime(date_ser, format='%Y%m%d')
elif char_num == 10:
return pd.to_datetime(date_ser, format='%Y%m%d%H')
elif char_num == 12:
return pd.to_datetime(date_ser, format='%Y%m%d%H%M')
else:
raise ValueError("there was an error while converting the following to a correct datetime"+
date_ser.head())
# functions
# ---------
[docs]
def get_ftp_file_list(ftp_conn, ftp_folders):
"""Get a list of files in the folders with their modification dates.
Parameters
----------
ftp_conn : ftplib.FTP
Ftp connection.
ftp_folders : list of str or pathlike object
The directories on the ftp server to look for files.
Returns
-------
list of tuples of strs
A list of Tuples. Every tuple stands for one file.
The tuple consists of (filepath, modification date).
"""
# check types
if isinstance(ftp_folders, str):
ftp_folders = [ftp_folders]
for i, ftp_folder in enumerate(ftp_folders):
if isinstance(ftp_folder, pathlib.Path):
ftp_folders[i] = ftp_folder.as_posix()
try:
ftp_conn.voidcmd("NOOP")
except ftplib.all_errors:
ftp_conn.connect()
# get files and modification dates
files = []
for ftp_folder in ftp_folders:
lines = []
ftp_conn.dir(ftp_folder, lines.append)
for line in lines:
parts = line.split(maxsplit=9)
filepath = ftp_folder + parts[8]
modtime = dateutil.parser.parse(parts[5] + " " + parts[6] + " " + parts[7])
files.append((filepath, modtime))
return files
[docs]
def get_cdc_file_list(ftp_folders):
with ftplib.FTP(CDC_HOST) as ftp_con:
ftp_con.login()
files = get_ftp_file_list(ftp_con, ftp_folders)
return files
[docs]
def get_dwd_file(zip_filepath):
"""
Get a DataFrame from one single (zip-)file from the DWD FTP server.
Parameters
----------
zip_filepath : str
Path to the file on the server. e.g.
- "/climate_environment/CDC/observations_germany/climate/10_minutes/air_temperature/recent/10minutenwerte_TU_00044_akt.zip"
- "/climate_environment/CDC/derived_germany/soil/daily/historical/derived_germany_soil_daily_historical_73.txt.gz"
Returns
-------
pandas.DataFrame
The DataFrame of the selected file in the zip folder.
"""
# get the compressed folder from dwd
with ftplib.FTP(CDC_HOST) as ftp:
ftp.login()
# download file
compressed_bin = BytesIO()
num_tried = 0
while num_tried < 10:
try:
ftp.retrbinary("RETR " + zip_filepath, compressed_bin.write)
break
except Exception as e:
if num_tried < 9:
num_tried += 1
time.sleep(random.randint(0,400)/100)
else:
raise e
# check folder to be derived or observation type import the data
if re.search("observations", zip_filepath):
# get zip folder and files
compressed_folder = ZipFile(compressed_bin)
compressed_folder_files = compressed_folder.namelist()
# test if one and only one file matches the pattern
files = list(filter(re.compile("produkt").search,
compressed_folder_files))
if len(files) == 0:
raise ValueError(
"There is no file matching the pattern: produkt " +
"in the zip files: \n- " +
"\n- ".join(compressed_folder_files))
elif len(files) > 1:
raise ValueError(
"There are more than one files matching the " +
"pattern: produkt\nin the zip file: " +
str(zip_filepath) +
"\nonly the first file is returned: " +
str(files[0]))
# extract the file from the zip folder and return it as pd.DataFrame
with compressed_folder.open(files[0]) as f:
df = pd.read_table(f, sep=";",
dtype={"Datum":str, "MESS_DATUM":str},
skipinitialspace=True,
na_values=[-999, -9999, "####", "#####", "######"])
elif re.search("derived", zip_filepath):
df = pd.read_table(f"ftp://{CDC_HOST}/{zip_filepath}",
compression="gzip",
sep=";",
skipinitialspace=True,
dtype={"Datum":str, "MESS_DATUM":str},
na_values=[-999, -9999, "####", "#####", "######"])
else:
raise ImportError("ERROR: No file could be imported, as there is " +
"just a setup for observation and derived datas")
# convert dates to datetime
for col in ["MESS_DATUM", "Datum"]:
if col in df.columns:
df[col] = _dwd_date_parser(df[col])
return df
[docs]
def get_dwd_meta(ftp_folder):
"""
Get the meta file from the ftp_folder on the DWD server.
Downloads the meta file of a given folder.
Corrects the meta file of missing files. So if no file for the station is
in the folder the meta entry gets deleted.
Reset "von_datum" in meta file if there is a biger gap than max_hole_d.
Delets entries with less years than min_years.
Parameters
----------
ftp_folder : str
The path to the directory where to search for the meta file.
e.g. "climate_environment/CDC/observations_germany/climate/hourly/precipitation/recent/".
Returns
-------
geopandas.GeoDataFrame
a GeoDataFrame of the meta file
"""
# open ftp connection and get list of files in folder
with ftplib.FTP(CDC_HOST) as ftp:
ftp.login()
# get and check the meta_file name
ftp_files = ftp.nlst(ftp_folder)
pattern = r".*(?<!_mn4)((_stations_list)|(_Beschreibung_Stationen))+.txt$"
meta_file = list(filter(re.compile(pattern).match, ftp_files))
if len(meta_file) == 0:
log.info(
f"There is no file matching the pattern '{pattern}'"+
f"\nin the folder: ftp://{CDC_HOST}/{str(ftp_folder)}")
return None
elif len(meta_file) > 1:
log.info(
f"There are more than one files matching the pattern: {pattern}" +
f" in the folder:\nftp://{CDC_HOST}/{str(ftp_folder)}" +
f"\nonly the first file is returned: {meta_file[0]}")
# import meta file
try:
if re.search("observations", ftp_folder):
with ftplib.FTP(CDC_HOST) as ftp:
ftp.login()
with BytesIO() as bio, StringIO() as sio:
ftp.retrbinary("RETR " + meta_file[0], bio.write)
sio.write(bio.getvalue().decode("WINDOWS-1252").replace("\r\n", "\n"))
colnames = sio.getvalue().split("\n")[0].split()
sio.seek(0)
meta = pd.read_table(
sio,
skiprows=2,
lineterminator="\n",
sep=r"\s{2,}|(?<=\d|\))\s{1}(?=[\w])", # two or more white spaces or one space after digit and followed by word
names=colnames,
parse_dates=[col for col in colnames if "datum" in col.lower()],
index_col="Stations_id",
engine="python")
elif re.search("derived", ftp_folder):
meta = pd.read_table("ftp://opendata.dwd.de/" + meta_file[0],
encoding="WINDOWS-1252", sep=";", skiprows=1,
names=["Stations_id", "Stationshoehe",
"geoBreite", "geoLaenge",
"Stationsname", "Bundesland"],
index_col="Stations_id"
)
except:
traceback.print_exc()
print("URL Error: The URL could not be found:\n" +
"ftp://opendata.dwd.de/" + meta_file[0])
return None
try:
meta = gpd.GeoDataFrame(meta,
geometry=gpd.points_from_xy(meta.geoLaenge,
meta.geoBreite,
crs="EPSG:4326"))
meta = meta.drop(["geoLaenge", "geoBreite"], axis=1)
except:
traceback.print_exc()
print("Error while converting DataFrame to GeoDataFrame," +
" maybe the columns aren't named 'geoLaenge' and geoBreite'" +
"\nhere is the header of the DataFrame:\n")
print(meta.head())
return None
# delete entries where there is no file in the ftp-folder
rows_drop = []
str_ftp_files = str(ftp_files)
for i, row in meta.iterrows():
if not (re.search(r"[_\.]" + dwd_id_to_str(i) + r"[_\.]|" +
r"[_\.]" + str(i) + r"[_\.]", str_ftp_files)):
rows_drop.append(i)
meta = meta.drop(rows_drop)
# change meta date entries if the file has a different date
if ("observation" in ftp_folder) \
and ("bis_datum" and "von_datum" in meta) \
and ("recent" not in ftp_folder):
zip_files = list(filter(re.compile(r".+\d+_\d+_\d+_hist.zip").match,
ftp_files))
zip_files.sort()
zip_files.append(zip_files[0]) # else the last entry won't get tested
last_sid, last_from_date, last_to_date = None, None, None
for zip_file in zip_files:
# get new files dates
filename = zip_file.split("/")[-1]
_, kind, sid, from_date, to_date, _ = filename.split("_")
if kind in ["niedereder"]:
continue
from_date = pd.Timestamp(from_date)
to_date = pd.Timestamp(to_date)
sid = int(sid)
# compare with previous file's dates
if last_sid and (sid == last_sid):
last_to_date = to_date
else:
# compare last values with meta file dates
if last_sid and (last_sid in meta.index):
if last_from_date > meta.loc[last_sid, "von_datum"]:
meta.loc[last_sid, "von_datum"] = last_from_date
if last_to_date < meta.loc[last_sid, "bis_datum"]:
meta.loc[last_sid, "bis_datum"] = last_to_date
# set values as last values
last_to_date = to_date
last_from_date = from_date
last_sid = sid
# trim whitespace in string columns
for dtype, col in zip(meta.dtypes, meta.columns):
if pd.api.types.is_string_dtype(dtype) and col != "geometry":
meta[col] = meta[col].str.strip()
# return
return meta