Source code for reegis.geometries

# -*- coding: utf-8 -*-

"""Reegis geometry tools.

SPDX-FileCopyrightText: 2016-2021 Uwe Krien <krien@uni-bremen.de>

SPDX-License-Identifier: MIT
"""
__copyright__ = "Uwe Krien <krien@uni-bremen.de>"
__license__ = "MIT"


# Python libraries
import os
import logging

# Internal libraries
from reegis import config as cfg

# External libraries
import pandas as pd
import geopandas as gpd
from shapely.wkt import loads as wkt_loads
from shapely.geometry import Point
from shapely.geometry.base import BaseGeometry


[docs]def get_federal_states_polygon(): """ Get a region set for the federal states of Germany. Examples -------- >>> list(get_federal_states_polygon().iloc[0:4].index) ['HH', 'NI', 'MV', 'SH'] """ federal_states = load( cfg.get("paths", "geometry"), cfg.get("geometry", "federalstates_polygon"), ) return federal_states.set_index("iso")
[docs]def get_germany_with_awz_polygon(): return get_germany_polygon(with_awz=True)
[docs]def get_germany_polygon(with_awz=False): """ Get the polygon of Germany with the exclusive economic zone of Germany in one polygon. Examples -------- >>> int(get_germany_polygon(with_awz=True).to_crs(epsg=25832).area[0]/1e6) 414537 >>> int(get_germany_polygon(with_awz=False).to_crs(epsg=25832).area[0]/1e6) 357047 """ if with_awz: name = "germany_awz_polygon" else: name = "germany_polygon" return load( cfg.get("paths", "geometry"), "{0}.geojson".format(name), index_col="gid", )
[docs]def load( path=None, filename=None, fullname=None, hdf_key=None, index_col=None, crs=None, ): """ Load files with geographic information into a GeoDataFrame. Allowed types are csv, hdf, shp and geojson. """ if fullname is None: fullname = os.path.join(path, filename) suffix = fullname.split(".")[-1] if suffix == "csv": df = load_csv(fullname=fullname, index_col=index_col) gdf = create_geo_df(df, crs=crs) elif suffix == "hdf" or suffix == "h5": df = pd.DataFrame(load_hdf(fullname=fullname, key=hdf_key)) gdf = create_geo_df(df, crs=crs) elif suffix == "shp" or suffix == "geojson": gdf = load_shp(fullname=fullname) if index_col is not None: gdf.set_index(index_col, inplace=True) else: raise ValueError( "Cannot load file with a '{0}' extension.".format(suffix) ) return gdf
[docs]def load_shp(path=None, filename=None, fullname=None): """Load an `shp` file as GeoDataFrame.""" if fullname is None: fullname = os.path.join(path, filename) return gpd.read_file(fullname)
[docs]def load_hdf(path=None, filename=None, fullname=None, key=None): """Load a `hdf` file.""" if fullname is None: fullname = os.path.join(path, filename) return pd.read_hdf(fullname, key)
[docs]def load_csv(path=None, filename=None, fullname=None, index_col=None): """Load csv-file into a DataFrame.""" if fullname is None: fullname = os.path.join(path, filename) df = pd.read_csv(fullname) # Make the first column the index if all values are unique. if index_col is None: first_col = df.columns[0] if not any(df[first_col].duplicated()): df.set_index(first_col, drop=True, inplace=True) else: df.set_index(index_col, drop=True, inplace=True) return df
[docs]def lat_lon2point(df): """Create shapely point object of latitude and longitude.""" return Point(df["longitude"], df["latitude"])
[docs]def create_geo_df( df, wkt_column=None, lon_column=None, lat_column=None, crs=None ): """Convert pandas.DataFrame to geopandas.geoDataFrame""" if "geom" in df: df = df.rename(columns={"geom": "geometry"}) if lon_column is not None: if lon_column not in df: raise ValueError( "Cannot find column for longitude: {0}".format(lon_column) ) else: df.rename(columns={lon_column: "longitude"}, inplace=True) if lat_column is not None: if lat_column not in df: raise ValueError( "Cannot find column for latitude: {0}".format(lat_column) ) else: df.rename(columns={lat_column: "latitude"}, inplace=True) if wkt_column is not None: df["geometry"] = df[wkt_column].apply(wkt_loads) elif "geometry" not in df and "longitude" in df and "latitude" in df: df["geometry"] = df.apply(lat_lon2point, axis=1) elif "geometry" not in df: msg = "Could not create GeoDataFrame. Missing geometries." raise ValueError(msg) elif isinstance(df.iloc[0]["geometry"], str): df["geometry"] = df["geometry"].apply(wkt_loads) elif isinstance(df.iloc[0]["geometry"], BaseGeometry): pass if crs is None: crs = "epsg:4326" gdf = gpd.GeoDataFrame(df, crs=crs, geometry="geometry") logging.debug("GeoDataFrame created.") return gdf
[docs]def remove_invalid_geometries(gdf): """Remove rows that do not have a valid geometry.""" logging.warning("Invalid geometries have been removed.") invalid = gdf.loc[~gdf.is_valid].copy() if float(invalid["capacity"].sum()) > 0: logging.warning( "Removed capacity due to invalid geometry: {0}".format( invalid["capacity"].sum() ) ) return gdf.loc[gdf.is_valid]
[docs]def spatial_join_with_buffer( geo1, geo2, name, jcol="index", step=0.05, limit=1 ): """Add name of containing region to new column for all points. Parameters ---------- geo1 : geopandas.geoDataFrame Point layer. geo2 : geopandas.geoDataFrame Polygon layer. jcol : str name : str Name of the new column with the region names/identifiers. step : float limit : float Returns ------- geopandas.geoDataFrame """ if jcol == "index": jcol = "index_right" if isinstance(geo1, gpd.GeoSeries): crs = geo1.crs geo1 = gpd.GeoDataFrame(geo1, columns=["geometry"]) geo1.crs = crs logging.debug("Doing spatial join...") # Spatial (left) join with the "within" operation. jgdf = gpd.sjoin(geo1, geo2, how="left", op="within") logging.info("Joined!") diff_cols = set(jgdf.columns) - set(geo1) - {jcol} # Buffer all geometries that are not within any polygon bf = 0 len_df = len(jgdf.loc[jgdf[jcol].isnull()]) if len_df == 0: logging.info("Buffering not necessary.") elif limit == 0: logging.info("No buffering. Buffer-limit is 0.") else: msg = "Buffering {0} non-matching geometries ({1}%)..." logging.info(msg.format(len_df, round(len_df / len(jgdf) * 100, 1))) if len_df * 5 > len(jgdf) and limit > 0: msg = "{0} % non-matching geometries seems to be too high." logging.warning(msg.format(round(len_df / len(jgdf) * 100))) while len_df > 0 and bf < limit: # Increase the buffer by step. bf += step # Add the buffer to all rows that did not match. jgdf.loc[jgdf[jcol].isnull(), "buffer"] = jgdf.loc[ jgdf[jcol].isnull() ].buffer(bf) # Create a temporary GeoDataFrame with the buffer as geometry tmp = jgdf.loc[jgdf[jcol].isnull()] del tmp[tmp.geometry.name] del tmp[jcol] if "index_right" in tmp: del tmp["index_right"] tmp = tmp.set_geometry("buffer") # Try spatial join with "intersects" with buffered geometries. newj = gpd.sjoin(tmp, geo2, how="left") # If new matches were found they were written to the original GeoDF. if len(newj.loc[newj[jcol].notnull() > 0]): # If two regions intersects with the buffer the first is taken. try: jgdf[jcol] = jgdf[jcol].fillna(newj[jcol]) except ValueError: newj = newj[~newj.index.duplicated(keep="first")] jgdf[jcol] = jgdf[jcol].fillna(newj[jcol]) logging.warning( "Two matches found while buffering, first one taken." ) logging.warning("Use smaller steps to avoid this behaviour.") # Calculate the number of non-matching geometries. len_df = len(jgdf.loc[jgdf[jcol].isnull()]) logging.info( "Buffer: {0}, Remaining_length: {1}".format(bf, len_df) ) if len_df == 0: logging.info("All geometries matched after buffering.") if bf == limit: logging.info("Stop buffering. Reached buffer limit.") # delete the temporary buffer column if "buffer" in jgdf.columns: del jgdf["buffer"] # Remove all columns but the join-id column (jcol) from the GeoDf. for col in diff_cols: del jgdf[col] jgdf = jgdf.rename(columns={jcol: name}) jgdf[name] = jgdf[name].fillna("unknown") logging.info("New column '{0}' added to GeoDataFrame.".format(name)) return jgdf