# -*- coding: utf-8 -*-
"""Aggregate the number of inhabitants for a regions/polygons within Germany.
SPDX-FileCopyrightText: 2016-2021 Uwe Krien <krien@uni-bremen.de>
SPDX-License-Identifier: MIT
"""
__copyright__ = "Uwe Krien <krien@uni-bremen.de>"
__license__ = "MIT"
# Python libraries
import os
import zipfile
import shutil
import glob
import logging
# External libraries
import geopandas as gpd
# Internal modules
from reegis import config as cfg
from reegis import geometries
from reegis import tools
[docs]def get_ew_shp_file(year):
"""
Parameters
----------
year
Returns
-------
Examples
--------
>>> print(get_ew_shp_file(2014)[-35:])
data/inhabitants/VG250_VWG_2014.shp
"""
if year < 2011:
logging.error("Shapefile with inhabitants are available since 2011.")
logging.error("Try to find another source to get older data sets.")
raise AttributeError("Years < 2011 are not allowed in this function.")
outshp = os.path.join(
cfg.get("paths", "inhabitants"), "VG250_VWG_" + str(year) + ".shp"
)
if not os.path.isfile(outshp):
url = cfg.get("inhabitants", "url_geodata_ew").format(
year=year, var1="{0}"
)
filename_zip = os.path.join(
cfg.get("paths", "inhabitants"),
cfg.get("inhabitants", "vg250_ew_zip"),
)
msg = tools.download_file(filename_zip, url.format("ebene"))
if msg == 404:
logging.warning("Wrong URL. Try again with different URL.")
tools.download_file(
filename_zip, url.format("ebenen"), overwrite=True
)
zip_ref = zipfile.ZipFile(filename_zip)
zip_ref.extractall(cfg.get("paths", "inhabitants"))
zip_ref.close()
subs = next(os.walk(cfg.get("paths", "inhabitants")))[1]
mysub = None
for sub in subs:
if "vg250" in sub:
mysub = sub
pattern_path = list()
pattern_path.append(
os.path.join(
cfg.get("paths", "inhabitants"),
mysub,
"vg250-ew_ebenen",
"VG250_VWG*",
)
)
pattern_path.append(
os.path.join(
cfg.get("paths", "inhabitants"),
mysub,
"vg250-ew_ebenen",
"vg250_vwg*",
)
)
pattern_path.append(
os.path.join(
cfg.get("paths", "inhabitants"),
mysub,
"vg250_ebenen-historisch",
"de{0}12".format(str(year)[-2:]),
"vg250_vwg*",
)
)
for pa_path in pattern_path:
for file in glob.glob(pa_path):
file_new = os.path.join(
cfg.get("paths", "inhabitants"),
"VG250_VWG_" + str(year) + file[-4:],
)
shutil.copyfile(file, file_new)
shutil.rmtree(os.path.join(cfg.get("paths", "inhabitants"), mysub))
os.remove(filename_zip)
return outshp
[docs]def get_ew_geometry(year, polygon=False):
"""Get a map with the number of inhabitants."""
filename_shp = os.path.join(
cfg.get("paths", "inhabitants"), "VG250_VWG_" + str(year) + ".shp"
)
if not os.path.isfile(filename_shp):
get_ew_shp_file(year)
vwg = gpd.read_file(filename_shp)
# replace polygon geometry by its centroid
if polygon is False:
vwg["geometry"] = vwg.representative_point()
return vwg
[docs]def get_inhabitants_by_region(year, geo, name):
"""
Get inhabitants for the given region polygons.
Parameters
----------
year
geo
name
Returns
-------
pd.DataFrame
Examples
--------
>>> geo=geometries.get_federal_states_polygon()
>>> get_inhabitants_by_region(2014, geo, name='federal_states').sum()
81197537
"""
ew = get_ew_geometry(year)
ew = geometries.spatial_join_with_buffer(ew, geo, name=name, step=0.005)
return ew.groupby(name).sum()["EWZ"]
[docs]def get_inhabitants_by_multi_regions(year, geo, name):
"""
Get a MultiIndex table with the inhabitants from all given geometry sets.
Parameters
----------
year : int
geo : tuple or list
name : tuple or list
Returns
-------
Examples
--------
>>> geo1=geometries.load(
... cfg.get('paths', 'geometry'),
... cfg.get('geometry', 'de21_polygons'), index_col='region')
>>> geo2=geometries.get_federal_states_polygon()
>>> inh=get_inhabitants_by_multi_regions(
... 2014, [geo1, geo2], ['de21', 'fs'])
>>> inh.loc['DE01']['BB']
1811137
>>> inh.loc['DE01']['BE']
3469849
"""
ew = get_ew_geometry(year)
n = 0
for geo_one in geo:
ew = geometries.spatial_join_with_buffer(
ew, geo_one, name=name[n], step=0.005
)
n += 1
return ew.groupby(name).sum()["EWZ"]
[docs]def get_share_of_federal_states_by_region(year, regions, name):
"""
Parameters
----------
year : int
regions : tuple or list
name : tuple or list
Returns
-------
Examples
--------
>>> regions=geometries.load(
... cfg.get('paths', 'geometry'),
... cfg.get('geometry', 'de21_polygons'), index_col='region')
>>> inh=get_share_of_federal_states_by_region(2014, regions, 'de21')
>>> round(inh.loc['DE01']['BB'], 2)
0.74
>>> round(inh.loc['DE01']['BE'], 2)
1.0
"""
# Get inhabitants for federal states and the given regions
fs_geo = geometries.get_federal_states_polygon()
ew = get_inhabitants_by_multi_regions(
year, [regions, fs_geo], name=[name, "federal_states"]
)
ew = ew[ew != 0]
# Calculate the share of the federal states within the regions.
fs_sum = ew.groupby(level=1).sum().copy()
for reg in ew.index.get_level_values(0).unique():
for fs in ew.loc[reg].index:
ew.loc[reg, fs] = ew.loc[reg, fs] / fs_sum[fs]
return ew
[docs]def get_ew_by_federal_states(year):
"""Get the inhabitants per federal state for a given year."""
geo = geometries.load(
cfg.get("paths", "geometry"),
cfg.get("geometry", "federalstates_polygon"),
)
geo.set_index("iso", drop=True, inplace=True)
geo.drop(["N0", "N1", "O0", "P0"], inplace=True)
return get_inhabitants_by_region(year, geo, name="federal_states")
if __name__ == "__main__":
pass