Source code for weatherDB.lib.max_fun.import_DWD
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""A collection of functions to import data from the DWD-CDC Server."""
__author__ = "Max Schmit"
__copyright__ = "Copyright 2021, Max Schmit"
# libraries
import os
import geopandas as gpd
import pandas as pd
from datetime import datetime
from zipfile import ZipFile
import ftplib
import re
from io import BytesIO
import traceback
import logging
from pathlib import Path
import time
import random
from socket import gethostname
# logger
log = logging.getLogger("import_DWD")
this_dir = Path(__file__).parent
log_dir = Path(this_dir).joinpath("logs")
if not log_dir.is_dir(): log_dir.mkdir()
log_fh = logging.FileHandler(
log_dir.joinpath("DWD_import_" + gethostname() + ".txt"))
log.setLevel(logging.DEBUG)
log.addHandler(log_fh)
# ftp connection
class FTP(ftplib.FTP):
def login(self, **kwargs):
# this prevents an error message if the user is already logged in
try:
super().login(**kwargs)
except (ConnectionAbortedError, ftplib.error_temp, BrokenPipeError):
self.__init__(self.host)
self.login()
except (ftplib.error_perm, EOFError, ftplib.error_reply):
pass # this means the connection is already logged in
FTP_CDC = FTP("opendata.dwd.de")
FTP_CDC.login()
# basic functions
# ----------------
[docs]def dwd_id_to_str(id):
"""
Convert a station id to normal DWD format as str.
Parameters
----------
id : int or str
The id of the station.
Returns
-------
str
string of normal DWD Station id.
"""
return str(int(id) + 100000)[1:6]
def _dwd_date_parser(date_str):
"""
Parse the dates from a DWD table to datetime.
Parameters
----------
date_str : list of str or str
the string from the DWD table. e.g. "20200101" or "2020010112"
Returns
-------
datetime.datetime
The date as datetime.
"""
# test if list or single str
if type(date_str) == list:
char_num = len(date_str[0])
elif type(date_str) == str:
char_num = len(date_str)
# parse to correct datetime
if char_num == 8:
return datetime.strptime(date_str, '%Y%m%d')
elif char_num == 10:
return datetime.strptime(date_str, '%Y%m%d%H')
elif char_num == 12:
return datetime.strptime(date_str, '%Y%m%d%H%M')
else:
raise ValueError("there was an error while converting " + date_str +
" to a correct datetime")
# main functions
[docs]def get_dwd_file(zip_filepath):
"""
Get a DataFrame from one single (zip-)file from the DWD FTP server.
Parameters
----------
zip_filepath : str
Path to the file on the server. e.g.
- "/climate_environment/CDC/observations_germany/climate/10_minutes/air_temperature/recent/10minutenwerte_TU_00044_akt.zip"
- "/climate_environment/CDC/derived_germany/soil/daily/historical/derived_germany_soil_daily_historical_73.txt.gz"
Returns
-------
pandas.DataFrame
The DataFrame of the selected file in the zip folder.
"""
# get the compressed folder from dwd
ftp = FTP_CDC
ftp.login()
compressed_bin = BytesIO()
num_tried = 0
while num_tried < 10:
try:
ftp.retrbinary("RETR " + zip_filepath, compressed_bin.write)
break
except Exception as e:
if num_tried < 9:
num_tried += 1
time.sleep(random.randint(0,400)/100)
else:
raise e
# check folder to be derived or observation type import the data
if re.search("observations", zip_filepath):
# get zip folder and files
compressed_folder = ZipFile(compressed_bin)
compressed_folder_files = compressed_folder.namelist()
# test if one and only one file matches the pattern
files = list(filter(re.compile("produkt").search,
compressed_folder_files))
if len(files) == 0:
raise ValueError(
"There is no file matching the pattern: produkt " +
"in the zip files: \n- " +
"\n- ".join(compressed_folder_files))
elif len(files) > 1:
raise ValueError(
"There are more than one files matching the " +
"pattern: produkt\nin the zip file: " +
str(zip_filepath) +
"\nonly the first file is returned: " +
str(files[0]))
# extract the file from the zip folder and return it as pd.DataFrame
with compressed_folder.open(files[0]) as f:
return pd.read_table(f, sep=";",
parse_dates=["MESS_DATUM"],
date_parser=_dwd_date_parser,
skipinitialspace=True,
na_values=[-999, "####", "#####", "######"])
elif re.search("derived", zip_filepath):
return pd.read_table("ftp://opendata.dwd.de/" + zip_filepath,
compression="gzip",
sep=";",
parse_dates=["Datum"],
date_parser=_dwd_date_parser,
skipinitialspace=True,
na_values=[-999, "####", "#####", "######"])
else:
raise ImportError("ERROR: No file could be imported, as there is " +
"just a setup for observation and derived datas")
[docs]def get_dwd_data(station_id, ftp_folder):
"""
Get the weather data for one station from the DWD server.
Parameters
----------
station_id : str or int
Number of the station to get the weather data from.
ftp_folder : str
the base folder where to look for the stations_id file.
e.g. ftp_folder = "climate_environment/CDC/observations_germany/climate/hourly/precipitation/historical/".
If the parent folder, where "recent"/"historical" folder is inside, both the historical and recent data gets merged.
Returns
-------
pandas.DataFrame
The DataFrame of the selected file in the zip folder.
"""
# check folder to be derived or observation type
if re.search("observations", ftp_folder):
station_id = dwd_id_to_str(station_id)
date_col = "MESS_DATUM"
elif re.search("derived", ftp_folder):
station_id = str(int(station_id))
date_col = "Datum"
# open ftp-server connection
ftp = FTP_CDC
ftp.login()
# test if recent or historical specified
if re.search(r"((observations)|(derived))(?!.*((historical)|(recent))[\/]*$)", ftp_folder):
if (ftp_folder[-1] != "/"):
ftp_folder += "/"
ftp_folders = [ftp_folder + "historical", ftp_folder + "recent"]
else:
ftp_folders = [ftp_folder]
# test if station is in folder or even several times
comp = re.compile(r".*_" + station_id + r"[_\.].*")
zipfilenames = []
for ftp_folder in ftp_folders:
zipfilenames.extend(list(filter(comp.match, ftp.nlst(ftp_folder))))
if len(zipfilenames) == 0:
log.debug("There is no file for the Station " + station_id + " in " +
"ftp://opendata.dwd.de/" + ftp_folder)
return None
elif len(zipfilenames) > 1:
log.info("There are several files for the Station " + station_id +
" in: \nftp://opendata.dwd.de/" + ftp_folder +
"\nthey will get concatenated together! \n" +
"These are the files:\n" + "\n".join(zipfilenames))
# import every file and merge data
for zipfilename in zipfilenames:
try:
if "df_all" not in locals():
df_all = get_dwd_file(zipfilename)
else:
df_new = get_dwd_file(zipfilename)
# cut out if already in previous file
df_new = df_new[~df_new[date_col].isin(df_all[date_col])]
# concatenat the dfs
df_all = pd.concat([df_all, df_new])
df_all.reset_index(drop=True, inplace=True)
except IndexError:
log.info("The following file could not get imported " +
str(zipfilename))
# check for duplicates in date column
try:
df_all.set_index(date_col, inplace=True)
if df_all.index.has_duplicates:
df_all = df_all.groupby(df_all.index).mean()
except:
pass
# check if everything worked
if "df_all" not in locals():
raise ImportError("The file(s) for the dwd station " +
str(station_id) + " couldn't get imported.")
return df_all
def _concat_dwd_data(df1, df2):
"""Concatenet 2 dwd stations dataframes to one.
Is usefull for concatenating several dataframes of the same station together
or historical and recent datas.
Parameters
----------
df1 : pd.DataFrame
the first DataFrame of one DWD Station.
df2 : pd.DataFrame
the second DataFrame of one DWD Station.
Returns
-------
pd.DataFrame
a new DataFrame that contain the data of the 2 input DFs.
"""
# get Date column name
if "MESS_DATUM" in df1.columns:
datecolumn = "MESS_DATUM"
else:
datecolumn = "Datum"
# check for which df is oldest
if df1[datecolumn].min() < df2[datecolumn].min():
df_old = df1
df_young = df2
else:
df_old = df2
df_young = df1
# check if overlapping data
if df_old[datecolumn].max() > df_young[datecolumn].min():
df_young = df_young.loc[df_young[datecolumn] > df_old[datecolumn].max()]
df_concat = pd.concat([df_old, df_young])
df_concat.reset_index(drop=True, inplace=True)
return df_concat
[docs]def get_dwd_meta(ftp_folder, min_years=0, max_hole_d=9999):
"""
Get the meta file from the ftp_folder on the DWD server.
Downloads the meta file of a given folder.
Corrects the meta file of missing files. So if no file for the station is
in the folder the meta entry gets deleted.
Reset "von_datum" in meta file if there is a biger gap than max_hole_d.
Delets entries with less years than min_years.
Parameters
----------
ftp_folder : str
The path to the directory where to search for the meta file.
e.g. "climate_environment/CDC/observations_germany/climate/hourly/precipitation/recent/".
min_years : int, optional
filter the list of stations by a minimum amount of years,
that they have data for. 0 if the data should not get filtered.
Only works if the meta file has a timerange defined,
e.g. in "observations".
The default is 0.
max_hole_d : int
The maximum amount of days missing in the data allowed.
If there are several files for one station and the time hole is biger
than this value, the older "von_datum" is overwriten
in the meta GeoDataFrame.
The default is 2.
Returns
-------
geopandas.GeoDataFrame
a GeoDataFrame of the meta file
"""
# open ftp connection and get list of files in folder
ftp = FTP_CDC
ftp.login()
# get and check the meta_file name
ftp_files = ftp.nlst(ftp_folder)
pattern = ".+[(_stations_list)(_Beschreibung_Stationen)].txt"
meta_file = list(filter(re.compile(pattern).match, ftp_files))
if len(meta_file) == 0:
log.info("There is no file matching the pattern: " + pattern +
"\nin the folder: ftp://opendata.dwd.de/" + str(ftp_folder))
return None
elif len(meta_file) > 1:
log.info("There are more than one files matching the pattern: " +
pattern + "in the folder:\nftp://opendata.dwd.de/" +
str(ftp_folder) + "\nonly the first file is returned: " +
meta_file[0])
# import meta file
try:
if re.search("observations", ftp_folder):
meta = pd.read_table("ftp://opendata.dwd.de/" + meta_file[0],
skiprows=2, encoding="WINDOWS-1252",
sep=r"\s{2,}|(?<=\d)\s{1}(?=[\w])", # two or more white spaces or one space after word or digit and followed by word
names=["Stations_id", "von_datum",
"bis_datum", "Stationshoehe",
"geoBreite", "geoLaenge",
"Stationsname", "Bundesland"],
parse_dates=["von_datum", "bis_datum"],
index_col="Stations_id",
engine="python")
elif re.search("derived", ftp_folder):
meta = pd.read_table("ftp://opendata.dwd.de/" + meta_file[0],
encoding="WINDOWS-1252", sep=";", skiprows=1,
names=["Stations_id", "Stationshoehe",
"geoBreite", "geoLaenge",
"Stationsname", "Bundesland"],
index_col="Stations_id"
)
except:
traceback.print_exc()
print("URL Error: The URL could not be found:\n" +
"ftp://opendata.dwd.de/" + meta_file[0])
return None
try:
meta = gpd.GeoDataFrame(meta,
geometry=gpd.points_from_xy(meta.geoLaenge,
meta.geoBreite,
crs="EPSG:4326"))
meta = meta.drop(["geoLaenge", "geoBreite"], axis=1)
except:
traceback.print_exc()
print("Error while converting DataFrame to GeoDataFrame," +
" maybe the columns aren't named 'geoLaenge' and geoBreite'" +
"\nhere is the header of the DataFrame:\n")
meta.head()
return None
# delete entries where there is no file in the ftp-folder
rows_drop = []
str_ftp_files = str(ftp_files)
for i, row in meta.iterrows():
if not (re.search(r"[_\.]" + dwd_id_to_str(i) + r"[_\.]|" +
r"[_\.]" + str(i) + r"[_\.]", str_ftp_files)):
rows_drop.append(i)
meta = meta.drop(rows_drop)
# change meta date entries if the file has a different date
if ("observation" in ftp_folder) \
and ("bis_datum" and "von_datum" in meta) \
and ("recent" not in ftp_folder):
zip_files = list(filter(re.compile(".+\d+_\d+_\d+_hist.zip").match,
ftp_files))
zip_files.sort()
zip_files.append(zip_files[0]) # else the last entry won't get tested
last_sid, last_from_date, last_to_date = None, None, None
max_hole_d_td = pd.Timedelta(days=max_hole_d+1)
for zip_file in zip_files:
# get new files dates
filename = zip_file.split("/")[-1]
_, kind, sid, from_date, to_date, _ = filename.split("_")
if kind in ["niedereder"]:
continue
from_date = pd.Timestamp(from_date)
to_date = pd.Timestamp(to_date)
sid = int(sid)
# compare with previous file's dates
if last_sid and (sid == last_sid):
if (from_date - last_to_date) > max_hole_d_td:
last_from_date = from_date
last_to_date = to_date
else:
# compare last values with meta file dates
if last_sid and (last_sid in meta.index):
if last_from_date > meta.loc[last_sid, "von_datum"]:
meta.loc[last_sid, "von_datum"] = last_from_date
if last_to_date < meta.loc[last_sid, "bis_datum"]:
meta.loc[last_sid, "bis_datum"] = last_to_date
# set values as last values
last_to_date = to_date
last_from_date = from_date
last_sid = sid
# delete entries that do not exceed the minimum amount of years
if "bis_datum" and "von_datum" in meta:
days = meta.bis_datum - meta.von_datum
meta = meta[days >= pd.Timedelta(str(min_years * 365.25) + " d")]
else:
log.error("the meta file has no time columns, therefor the table " +
"can't get filtered for the available years")
# return
return meta
# debug
if False:
meta = get_dwd_meta(
ftp_folder="climate_environment/CDC/observations_germany/climate/10_minutes/precipitation/historical/",
min_years=11, max_hole_d=365)
print(len(meta))