|
@@ -0,0 +1,133 @@
|
|
|
+import re
|
|
|
+import numpy as np
|
|
|
+import pandas as pd
|
|
|
+import typing as T
|
|
|
+
|
|
|
+from pathlib import Path
|
|
|
+from munch import munchify
|
|
|
+
|
|
|
+from pycs import app
|
|
|
+from pycs.interfaces.LabelProvider import LabelProvider
|
|
|
+
|
|
|
+class Provider(LabelProvider):
|
|
|
+
|
|
|
+ names = [
|
|
|
+ 'is_local',
|
|
|
+ 'rarity',
|
|
|
+ 'super_family',
|
|
|
+ 'family',
|
|
|
+ 'sub_family',
|
|
|
+ 'tribe',
|
|
|
+ 'german',
|
|
|
+ 'swiss',
|
|
|
+ 'austrian',
|
|
|
+ 'kr_nr',
|
|
|
+ 'genus',
|
|
|
+ 'species',
|
|
|
+ 'authors',
|
|
|
+ 'comment',
|
|
|
+ 'remove_me',
|
|
|
+ 'changed',
|
|
|
+ 'version1_comment',
|
|
|
+ 'misc', # 'D-CH-A / non-KR / Kaukasus',
|
|
|
+ 'german_name',
|
|
|
+ ]
|
|
|
+
|
|
|
+ dtype = {
|
|
|
+ 'is_local': pd.CategoricalDtype(['nur lokal', 'tagaktiv']),
|
|
|
+ 'rarity': np.float32,
|
|
|
+ 'super_family': "category",
|
|
|
+ 'family': "category",
|
|
|
+ 'sub_family': "category",
|
|
|
+ 'tribe': "category",
|
|
|
+ 'german': pd.CategoricalDtype(['D', 'e', '?']),
|
|
|
+ 'swiss': pd.CategoricalDtype(['C', 'e', '?']),
|
|
|
+ 'austrian': pd.CategoricalDtype(['A', 'e', '?']),
|
|
|
+ 'kr_nr': "object",
|
|
|
+ 'genus': "category",
|
|
|
+ 'species': "category",
|
|
|
+ 'authors': "object",
|
|
|
+ 'comment': "object",
|
|
|
+ 'remove_me': "category",
|
|
|
+ 'changed': "object",
|
|
|
+ 'version1_comment': "object",
|
|
|
+ 'misc': "object",
|
|
|
+ 'german_name': str,
|
|
|
+ }
|
|
|
+
|
|
|
+ KR_REGEX = re.compile(r"^[\d\-a-zA-Z]+")
|
|
|
+
|
|
|
+
|
|
|
+ def __init__(self, root_folder: str, configuration: T.Dict):
|
|
|
+ config = munchify(configuration)
|
|
|
+ self.root = Path(root_folder)
|
|
|
+
|
|
|
+ self.label_file = self.root / config.filename
|
|
|
+ self.min_rarity = config.minimumRarity
|
|
|
+ self.hierarchy_levels = config.hierarchyLevels
|
|
|
+ self.only_german = config.onlyGerman
|
|
|
+
|
|
|
+ def close(self):
|
|
|
+ pass
|
|
|
+
|
|
|
+ def get_labels(self) -> T.List[dict]:
|
|
|
+ result = []
|
|
|
+
|
|
|
+ lepi_list = pd.read_csv(self.label_file,
|
|
|
+ names=self.names,
|
|
|
+ dtype=self.dtype,
|
|
|
+ sep="\t", header=0
|
|
|
+ )
|
|
|
+ app.logger.info(f"Found {len(lepi_list)} labels in {self.label_file}")
|
|
|
+
|
|
|
+ if self.min_rarity is not None:
|
|
|
+ mask = lepi_list.rarity >= self.min_rarity
|
|
|
+ lepi_list = lepi_list[mask]
|
|
|
+ app.logger.info(f"Labels {len(lepi_list):,d} with {self.min_rarity=}")
|
|
|
+
|
|
|
+ if self.only_german:
|
|
|
+ mask = (
|
|
|
+ lepi_list.german.eq("D") |
|
|
|
+ lepi_list.austrian.eq("A") |
|
|
|
+ lepi_list.swiss.eq("C")
|
|
|
+ ) & \
|
|
|
+ lepi_list["remove_me"].isin([np.nan])
|
|
|
+
|
|
|
+ lepi_list = lepi_list[mask]
|
|
|
+ app.logger.info(f"Labels {len(lepi_list):,d} for german-speaking countries")
|
|
|
+
|
|
|
+
|
|
|
+ parents = set()
|
|
|
+ for i, entry in lepi_list.iterrows():
|
|
|
+ parent_reference = None
|
|
|
+
|
|
|
+ for level, level_name in self.hierarchy_levels:
|
|
|
+ level_entry = entry[level]
|
|
|
+ if level_entry is None:
|
|
|
+ continue
|
|
|
+
|
|
|
+ reference, name = f'{level}_{level_entry.lower()}', level_entry
|
|
|
+
|
|
|
+ # parents should be added once
|
|
|
+ if reference not in parents:
|
|
|
+ result.append(self.create_label(reference, name, parent_reference, level_name))
|
|
|
+ parents.add(reference)
|
|
|
+
|
|
|
+ parent_reference = reference
|
|
|
+
|
|
|
+
|
|
|
+ # add label itself
|
|
|
+ if self.KR_REGEX.match(entry.kr_nr):
|
|
|
+ name = f'{entry.genus} {entry.species} ({entry.kr_nr})'
|
|
|
+ reference = entry.kr_nr
|
|
|
+
|
|
|
+ else:
|
|
|
+ name = f'{entry.genus} {entry.species}'
|
|
|
+ reference = f'_{name.lower()}'
|
|
|
+ result.append(self.create_label(reference, name, parent_reference))
|
|
|
+
|
|
|
+
|
|
|
+ app.logger.info(f"Finally, provided {len(result):,d} labels")
|
|
|
+ return result
|
|
|
+
|
|
|
+
|