Source code for geographer.converters.combine_remove_vector_classes

"""Combine and/or remove vector feature classes.

Create a new dataset from an existing one by combining and/or removing
vector feature classes.
"""

from __future__ import annotations

import logging
import shutil
from typing import Optional, Union

import pandas as pd
from geopandas.geodataframe import GeoDataFrame
from pydantic import Field
from tqdm.auto import tqdm

from geographer import Connector
from geographer.creator_from_source_dataset_base import DSCreatorFromSource
from geographer.global_constants import VECTOR_FEATURES_INDEX_NAME
from geographer.label_makers.label_maker_base import LabelMaker
from geographer.utils.utils import deepcopy_gdf

log = logging.Logger(__name__)


[docs] class DSConverterCombineRemoveClasses(DSCreatorFromSource): """Class for combining and/or removing vector feature classes. For creating a new dataset from an existing one by combining and/or removing vector feature classes. """ classes: list[Union[str, list[str]]] = Field( description="Classes to keep and combine. See docstring." ) new_class_names: Optional[list[str]] = Field( default=None, description="Names of new classes" ) class_separator: str = Field( default="+", description="Separator used when combining class names." ) new_background_class: Optional[str] = Field( default=None, description="Class to be set as new background class" ) remove_rasters: bool = Field( default=True, description="Whether to remove rasters not containing new classes from disk", ) label_maker: Optional[LabelMaker] = Field( default=None, description="Optional LabelMaker. If given, will update labels." ) def _create(self): self._create_or_update() def _update(self): self._create_or_update() def _create_or_update(self) -> Connector: """Combine and/or remove vector feature classes. Create a new dataset/connector from an existing one by combining and/or removing vector feature classes. Works for both categorical and soft-categorical label types. Warning: Will only add rasters and vector features from the source dataset, which is assumed to have grown in size. Deletions in the source dataset will not be inherited. Args: source_data_dir: data_dir of source dataset/connector target_data_dir: data_dir of target dataset/connector. If None (default value), will convert in place, i.e. overwrite source dataset and connector of tifs. classes: vector feature classes in existing dataset/connector to be kept and combined in new dataset/connector. E.g. [['ct', 'ht'], 'wr', ['h']] will combine the 'ct' and 'ht' classes, and also keep the 'wr' and 'h' classes. Along with the regular vector feature classes one may also use the background class here. new_class_names: optional list of names of new vector feature classes corresponding to classes. Defaults to joining the names of existing using the class_separator (which defaults to class_separator). class_separator: used if the new_class_names argument is not provided to join the names of existing vector feature classes that are to be kept. Defaults to class_separator. new_background_class: optional new background class, defaults to None, i.e. old background class remove_rasters: If True, remove rasters not containing vector features belonging to the vector feature classes to be kept. Returns: The Connector of the new dataset. Note: For the purposes of this function the background classes will be treated as regular vector feature classes. In particular, if you do not include them in the classes argument, vector features of the background class will be lost. """ # Determine classes classes = list( # convert strings in classes to singleton lists map(lambda x: x if isinstance(x, list) else [x], self.classes) ) classes_to_keep = [ class_ for list_of_classes in classes for class_ in list_of_classes ] new_class_names = self._get_new_class_names(classes) self._run_safety_checks(classes_to_keep, new_class_names) # Set information about background ... if self.new_background_class is not None: self.target_connector.background_class = self.new_background_class elif self.source_connector.background_class not in new_class_names: self.target_connector.attrs["background_class"] = None # ... and vector feature classes in self.target_connector. self.target_connector.task_vector_classes = [ class_ for class_ in new_class_names if class_ != self.target_connector.background_class ] vectors_from_source_df = self._combine_or_remove_classes_from_vectors( label_type=self.source_connector.label_type, vectors=self.source_connector.vectors, all_source_vector_classes=(self.source_connector.all_vector_classes), classes=classes, new_class_names=new_class_names, ) # need this later vectors_to_add_to_target_dataset = set(vectors_from_source_df.index) - set( self.target_connector.vectors.index ) # THINK ABOUT THIS!!!! # if we are creating a new soft-categorical dataset adjust columns # of empty self.target_connector.vectors if ( len(self.target_connector.vectors) == 0 and self.target_connector.label_type == "soft-categorical" ): (empty_vectors_with_corrected_columns) = ( self._combine_or_remove_classes_from_vectors( label_type="soft-categorical", vectors=self.target_connector.vectors, all_source_vector_classes=( self.source_connector.all_vector_classes ), classes=classes, new_class_names=new_class_names, ) ) self.target_connector.vectors = empty_vectors_with_corrected_columns self.target_connector.add_to_vectors( vectors_from_source_df.loc[list(vectors_to_add_to_target_dataset)] ) # Determine which rasters to copy to target dataset rasters_in_target_dataset_before_addings_rasters_from_source_dataset = ( { raster_path.name for raster_path in self.target_connector.rasters_dir.iterdir() } if self.target_connector.rasters_dir.exists() else set() ) rasters_in_source_rasters_dir = { raster_path.name for raster_path in self.source_connector.rasters_dir.iterdir() } if self.remove_rasters: rasters_in_source_that_should_be_in_target = { # all rasters in the source dataset ... raster_name for raster_name in self.source_connector.rasters.index # ... that intersect with the vector features that will be kept. if ( not set( self.source_connector.vectors_intersecting_raster(raster_name) ).isdisjoint(vectors_from_source_df.index) ) and (self.source_connector.rasters_dir / raster_name).exists() } else: rasters_in_source_that_should_be_in_target = rasters_in_source_rasters_dir rasters_to_copy_to_target_dataset = ( rasters_in_source_that_should_be_in_target - rasters_in_target_dataset_before_addings_rasters_from_source_dataset ) # Copy those rasters self.target_connector.rasters_dir.mkdir(parents=True, exist_ok=True) for raster_name in tqdm( rasters_to_copy_to_target_dataset, desc="Copying rasters" ): source_raster_path = self.source_connector.rasters_dir / raster_name target_raster_path = self.target_connector.rasters_dir / raster_name shutil.copyfile(source_raster_path, target_raster_path) # add rasters to self.target_connector df_of_rasters_to_add_to_target_dataset = self.source_connector.rasters.loc[ list(rasters_to_copy_to_target_dataset) ] self.target_connector.add_to_rasters(df_of_rasters_to_add_to_target_dataset) if self.label_maker is not None: # Determine labels to delete: # For each raster that already existed in the target dataset ... for ( raster_name ) in rasters_in_target_dataset_before_addings_rasters_from_source_dataset: # ... if among the vector features intersecting it # in the target dataset ... vectors_intersecting_raster = set( self.target_connector.vectors_intersecting_raster(raster_name) ) # ... there is a *new* (vector) geometry ... if ( vectors_intersecting_raster & vectors_to_add_to_target_dataset != set() ): # ... then we need to update the label for it, # so we delete the current label. self.label_maker.delete_labels( connector=self.target_connector, raster_names=[raster_name] ) # make labels self.label_maker.make_labels(connector=self.target_connector) # remember original type if self.target_connector.label_type == "categorical": self.target_connector.vectors.loc[ list(vectors_to_add_to_target_dataset), "orig_type" ] = self.source_connector.vectors.loc[ list(vectors_to_add_to_target_dataset), "type" ] return self.target_connector def _get_new_class_names(self, classes: list[str]) -> list[str]: # new_class_names if self.new_class_names is None: new_class_names = list(map(self.class_separator.join, classes)) else: new_class_names = self.new_class_names assert len(new_class_names) == len( set(new_class_names) ), "new_class_names need to be distinct!" assert len(new_class_names) == len( classes ), "there should be as many new_class_names as there are classes!" return new_class_names def _run_safety_checks( self, classes_to_keep: list[str], new_class_names: list[str] ): if not set(classes_to_keep) <= set(self.source_connector.all_vector_classes): classes_not_in_source_dataset = set(classes_to_keep) - set( self.source_connector.all_vector_classes ) raise ValueError( "The following classes are not in " "self.source_connector.all_vector_classes: " f"{classes_not_in_source_dataset}" ) if not len(classes_to_keep) == len(set(classes_to_keep)): raise ValueError( "a vector feature class in the source dataset " "can only be in at most one of the new classes" ) if ( self.new_background_class is not None and self.new_background_class not in new_class_names ): raise ValueError(f"new_background_class not in {self.new_class_names}") def _combine_or_remove_classes_from_vectors( self, label_type: str, vectors: GeoDataFrame, classes: list[str | list[str]], new_class_names: list[str], all_source_vector_classes: list[str], ) -> GeoDataFrame: """Combine and/or remove classes from vectors geodataframe. Args: label_type: [description] vectors: [description] classes: new_class_names: Returns: GeoDataFrame: [description] """ if label_type not in {"categorical", "soft-categorical"}: raise ValueError(f"Unknown label_type: {label_type}") vectors = deepcopy_gdf(vectors) classes_to_keep = [ class_ for list_of_classes in classes for class_ in list_of_classes ] if label_type == "categorical": def get_new_class(class_: str) -> str: for count, classes_ in enumerate(classes): if class_ in classes_: return new_class_names[count] # keep only vector features belonging to vector feature we want to keep vectors = vectors.loc[ vectors["type"].apply(lambda class_: class_ in classes_to_keep) ] # rename to new classes vectors.loc[:, "type"] = vectors["type"].apply(get_new_class) elif label_type == "soft-categorical": def prob_of_class_names(classes: list[str]) -> list[str]: answer = list(map(lambda class_: f"prob_of_class_{class_}", classes)) return answer # drop cols of classes we don't want to keep classes_to_drop = [ class_ for class_ in all_source_vector_classes if class_ not in classes_to_keep ] cols_to_drop = prob_of_class_names(classes_to_drop) vectors = vectors.drop(columns=cols_to_drop) # create temporary dataframe to avoid column name conflicts # when renaming/deleting etc temp_vectors = pd.DataFrame() temp_vectors.index.name = vectors.index.name # for each row/(vector) geometry find sum of probabilities # for the remaining vector feature classes cols_with_probs_of_remaining_classes = prob_of_class_names(classes_to_keep) sum_of_probs_of_remaining_classes = pd.DataFrame( vectors[cols_with_probs_of_remaining_classes].sum(axis=1), columns=["sum"], index=vectors.index, ) rows_where_sum_is_zero = sum_of_probs_of_remaining_classes["sum"] == 0 # remove rows/vector features which do not belong to remaining classes vectors = vectors.loc[~rows_where_sum_is_zero] sum_of_probs_of_remaining_classes = sum_of_probs_of_remaining_classes.loc[ ~rows_where_sum_is_zero ] # renormalize probabilities to sum to 1 vectors.loc[:, cols_with_probs_of_remaining_classes] = vectors[ cols_with_probs_of_remaining_classes ].div(sum_of_probs_of_remaining_classes["sum"], axis=0) # combine probabilities of new_classes and drop old classes for classes_of_new_class, new_class_name in zip(classes, new_class_names): cols_of_probs_to_be_added = prob_of_class_names(classes_of_new_class) temp_vectors[f"prob_of_class_{new_class_name}"] = vectors[ cols_of_probs_to_be_added ].sum(axis=1) vectors = vectors.drop(columns=cols_of_probs_to_be_added) # add new columns vectors = GeoDataFrame( pd.concat([vectors, temp_vectors], axis=1), # column axis crs=vectors.crs, geometry="geometry", ) vectors.index.name = VECTOR_FEATURES_INDEX_NAME # Recompute most likely type column. vectors["most_likely_class"] = vectors[temp_vectors.columns].apply( lambda s: ",".join( map( lambda col_name: col_name[15:], s[(s == s.max()) & (s != 0)].index.values, ) ), axis=1, ) return vectors