Created
December 18, 2019 09:11
-
-
Save Hugovdberg/575a0a22c2d5d57358d1a97a5aa23bc4 to your computer and use it in GitHub Desktop.
Alternative implementation xsboringen
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from abc import ABC, abstractmethod | |
| from collections.abc import Sequence | |
| import copy | |
| from shapely.geometry import asShape, LineString | |
| class CopyMixin: | |
| def copy(self, deep=False): | |
| if deep: | |
| return copy.deepcopy(self) | |
| return copy.copy(self) | |
| class SubsurfaceLayer(CopyMixin, ABC): | |
| """ | |
| A SubsurfaceLayer represents a single layer, defined by a top surface and bottom | |
| surface, optionally having an associated style""" | |
| def __init__( | |
| self, | |
| name, | |
| top, | |
| bottom, | |
| label=None, | |
| facecolor=None, | |
| edgecolor=None, | |
| alpha=None, | |
| linewidth=None, | |
| ): | |
| self.name = name | |
| self.top = top | |
| self.bottom = bottom | |
| self.label = label or name | |
| self.facecolor = facecolor | |
| self.edgecolor = edgecolor or facecolor | |
| self.alpha = alpha | |
| self.linewidth = linewidth | |
| def __repr__(self): | |
| return "{s.__class__.__qualname__}({s.name!r})".format(s=self) | |
| @abstractmethod | |
| def sample(self, coords): | |
| pass | |
| class RasterFileSubsurfaceLayer(SubsurfaceLayer): | |
| def __init__(self, cache=True, **kwargs): | |
| super().__init__(**kwargs) | |
| self._cache = {} | |
| with rasterio.open(self.top) as top: | |
| self.xul, self.yul = top.bounds.left, top.bounds.top | |
| self.delx = (top.bounds.right - top.bounds.left) / top.width | |
| self.dely = (top.bounds.top - top.bounds.bottom) / top.height | |
| def sample(self, coords): | |
| grid_coords = list(zip(*self.grid_coord(coords))) | |
| unknown_coords = {coord for coord in grid_coords if coord not in self._cache} | |
| sample_top_base = zip( | |
| self._sample_raster(self.top, unknown_coords), | |
| self._sample_raster(self.bottom, unknown_coords), | |
| ) | |
| for coord, top_base in zip(unknown_coords, sample_top_base): | |
| self._cache[coord] = top_base | |
| for coord in grid_coords: | |
| yield self._cache[coord] | |
| def grid_coord(self, coord): | |
| x, y = coord[:, 0], coord[:, 1] | |
| x = x - (x - xul) % delx + delx / 2 | |
| y = y + (yul - y) % dely - dely / 2 | |
| return x, y | |
| @staticmethod | |
| def _sample_raster(rasterfile, coords): | |
| with rasterio.open(rasterfile) as src: | |
| for value in src.sample(coords): | |
| if value[0] in src.nodatavals: | |
| yield np.nan | |
| else: | |
| yield float(value[0]) | |
| class GridSubsurfaceLayer(SubsurfaceLayer): | |
| def __init__(self, xul, yul, delx, dely, **kwargs): | |
| super().__init__(**kwargs) | |
| self.xul = xul | |
| self.yul = yul | |
| self.delx = delx | |
| self.dely = dely | |
| def sample(self, coords): | |
| for coord in coords: | |
| r = (self.yul - coord[1]) // self.dely | |
| c = (coord[0] - self.xul) // self.delx | |
| yield self.top[r, c], self.bottom[r, c] | |
| class SubsurfaceModel: | |
| """A SubsurfaceModel is a vertical stacking of SubsurfaceLayer objects""" | |
| def __init__( | |
| self, | |
| layers=None, | |
| name=None, | |
| default_facecolor=None, | |
| default_edgecolor=None, | |
| default_alpha=None, | |
| default_linewidth=None, | |
| ): | |
| self.name = name | |
| self.layers = [] | |
| if layers: | |
| for layer in layers: | |
| self.add_layer(layer) | |
| self.default_facecolor = default_facecolor | |
| self.default_edgecolor = default_edgecolor | |
| self.default_alpha = default_alpha | |
| self.default_linewidth = default_linewidth | |
| def add_layer(self, layer: SubsurfaceLayer): | |
| if not isinstance(layer, SubsurfaceLayer): | |
| raise TypeError("Only SubsurfaceLayers can be used in a SubsurfaceModel") | |
| self.layers.append(layer) | |
| def __iter__(self): | |
| for i, layer in enumerate(self.layers): | |
| layer_ = layer.copy() | |
| layer_.facecolor = layer_.facecolor or self.default_facecolor | |
| layer_.edgecolor = layer_.edgecolor or self.default_edgecolor | |
| layer_.alpha = layer_.alpha or self.default_alpha | |
| layer_.linewidth = layer_.linewidth or self.default_linewidth | |
| yield layer_ | |
| class CrossSection: | |
| """ | |
| A CrossSection represents a vertical section of the subsurface, onto which objects | |
| and measurements can be projected, optionally with one or more SubsurfaceModels | |
| providing the (hydro)geological context""" | |
| def __init__(self, line, resolution, ylim=None, ax=None, figsize=None): | |
| self.line = line | |
| self.ylim = ylim | |
| self.resolution = resolution | |
| if not ax and figsize: | |
| _, ax = plt.subplots(figsize=figsize) | |
| self.ax = ax or plt.gca() | |
| self.models = [] | |
| self._legend_items = {} | |
| self._coords = [] | |
| def add_subsurfacemodel(self, model: SubsurfaceModel): | |
| self.models.append(model) | |
| @property | |
| def line(self): | |
| return self._line | |
| @line.setter | |
| def line(self, value): | |
| if isinstance(value, LineString): | |
| self._line = value | |
| else: | |
| self._line = asShape({"type": "LineString", "coordinates": value}) | |
| @property | |
| def coords(self): | |
| coord_param = (self.line, self.resolution) | |
| if not self._coords or self._coords[0] != coord_param: | |
| self._coords = [ | |
| coord_param, | |
| np.array([ | |
| tuple(x[0] for x in self.line.interpolate(d).xy) | |
| for d in self.distance | |
| ]), | |
| ] | |
| return self._coords[1] | |
| @property | |
| def distance(self): | |
| return np.append( | |
| np.arange(0, self.line.length, self.resolution), self.line.length | |
| ) | |
| def plot_subsurfacemodels(self): | |
| distance, coords = self.distance, self.coords | |
| patches = [] | |
| for model in self.models: | |
| for layer in model: | |
| top, base = [np.array(x) for x in zip(*layer.sample(coords))] | |
| if np.isnan(top).all() or np.isnan(base).all(): | |
| continue | |
| if self.ylim: | |
| if np.nanmax(top) < min(self.ylim) or np.nanmin(base) > max( | |
| self.ylim | |
| ): | |
| continue | |
| patches.append( | |
| self.ax.fill_between( | |
| distance, | |
| top, | |
| base, | |
| label=layer.label, | |
| where=np.greater(top - base, 0, where=~np.isnan(top)), | |
| facecolor=layer.facecolor, | |
| edgecolor=layer.edgecolor, | |
| alpha=layer.alpha, | |
| linewidth=layer.linewidth, | |
| ) | |
| ) | |
| return patches | |
| def plot_legend(self, **kwargs): | |
| self.ax.legend(**kwargs) | |
| def plot_bends(self, **kwargs): | |
| distance = 0 | |
| for coord0, coord1 in zip(self.line.coords[:-2], self.line.coords[1:-1]): | |
| distance += LineString([coord0, coord1]).length | |
| self.ax.axvline(distance, **kwargs) | |
| def plot(self, *, legend=True, bends=True, legend_kwargs={}, bend_kwargs={}): | |
| layer_patches = self.plot_subsurfacemodels() | |
| if legend: | |
| _legend_kwargs = { | |
| "ncol": 1, | |
| "loc": "center left", | |
| "bbox_to_anchor": (1, 0.5), | |
| } | |
| _legend_kwargs.update(legend_kwargs) | |
| self.plot_legend(**_legend_kwargs) | |
| if bends: | |
| _bend_kwargs = {"color": "k", "linestyle": "-.", "linewidth": 0.5} | |
| _bend_kwargs.update(bend_kwargs) | |
| self.plot_bends(**_bend_kwargs) | |
| self.ax.set_xlim(0, tmp_cs.line.length) | |
| self.ax.set_ylim(self.ylim) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment