|
@@ -4,61 +4,12 @@ import typing as T
|
|
|
|
|
|
from munch import munchify
|
|
|
from collections import namedtuple
|
|
|
-from skimage import filters
|
|
|
-
|
|
|
-# the coordinates are relative!
|
|
|
-BBoxInfo = namedtuple("BBoxInfo", "area ratio mean std selected", defaults=[-1, -1, -1, -1, False])
|
|
|
-Detection = namedtuple("Detection", "bbox info")
|
|
|
-
|
|
|
-class BBox(namedtuple("BBox", "x0 y0 x1 y1")):
|
|
|
- __slots__ = ()
|
|
|
-
|
|
|
- @property
|
|
|
- def w(self):
|
|
|
- return abs(self.x1 - self.x0)
|
|
|
-
|
|
|
- @property
|
|
|
- def h(self):
|
|
|
- return abs(self.y1 - self.y0)
|
|
|
-
|
|
|
-
|
|
|
- @property
|
|
|
- def area(self):
|
|
|
- return self.h * self.w
|
|
|
-
|
|
|
- @property
|
|
|
- def ratio(self):
|
|
|
- return min(self.h, self.w) / max(self.h, self.w)
|
|
|
-
|
|
|
- def crop(self, im: np.ndarray, enlarge: bool = True):
|
|
|
-
|
|
|
- x0, y0, x1, y1 = self
|
|
|
- H, W, *_ = im.shape
|
|
|
-
|
|
|
- # translate from relative coordinates to pixel
|
|
|
- # coordinates for the given image
|
|
|
-
|
|
|
- x0, x1 = int(x0 * W), int(x1 * W)
|
|
|
- y0, y1 = int(y0 * H), int(y1 * H)
|
|
|
-
|
|
|
- # enlarge to a square extent
|
|
|
- if enlarge:
|
|
|
- h, w = int(self.h * H), int(self.w * W)
|
|
|
- size = max(h, w)
|
|
|
- dw, dh = (size - w) / 2, (size - h) / 2
|
|
|
- x0, y0 = max(int(x0 - dw), 0), max(int(y0 - dh), 0)
|
|
|
- x1, y1 = int(x0 + size), int(y0 + size)
|
|
|
-
|
|
|
-
|
|
|
- if im.ndim == 2:
|
|
|
- return im[y0:y1, x0:x1]
|
|
|
-
|
|
|
- elif im.ndim == 3:
|
|
|
- return im[y0:y1, x0:x1, :]
|
|
|
-
|
|
|
- else:
|
|
|
- ValueError(f"Unsupported ndims: {im.ndims=}")
|
|
|
|
|
|
+from blob_detector import utils
|
|
|
+from blob_detector.core.bbox import BBox
|
|
|
+from blob_detector.core.bbox_proc import Splitter
|
|
|
+from blob_detector.core.binarizers import BinarizerType
|
|
|
+from blob_detector.core.pipeline import Pipeline
|
|
|
|
|
|
class Detector(object):
|
|
|
|
|
@@ -67,148 +18,51 @@ class Detector(object):
|
|
|
super().__init__()
|
|
|
config = munchify(configuration)
|
|
|
|
|
|
- self.scale: float = config.preprocess.scale
|
|
|
- self.min_size: int = config.preprocess.min_size
|
|
|
- self.sigma: float = config.preprocess.sigma
|
|
|
-
|
|
|
- self.block_size_scale: float = config.threshold.block_size_scale
|
|
|
-
|
|
|
- self.dilate_iterations: int = config.postprocess.dilate_iterations
|
|
|
- self.kernel_size: int = config.postprocess.kernel_size
|
|
|
-
|
|
|
-
|
|
|
- def __call__(self, im: np.ndarray) -> T.List[Detection]:
|
|
|
-
|
|
|
- _im = self.rescale(im)
|
|
|
-
|
|
|
- im0 = self.preprocess(_im)
|
|
|
- im1 = self.threshold(im0)
|
|
|
- im2 = self.postprocess(im1)
|
|
|
-
|
|
|
- bboxes = self.detect(im2)
|
|
|
-
|
|
|
- return self.postprocess_boxes(_im, bboxes)
|
|
|
-
|
|
|
-
|
|
|
- def detect(self, im: np.ndarray) -> T.List[BBox]:
|
|
|
+ self.img_proc = Pipeline()
|
|
|
+ self.img_proc.rescale(
|
|
|
+ min_size=config.preprocess.min_size,
|
|
|
+ min_scale=config.preprocess.scale)
|
|
|
|
|
|
- contours, hierarchy = cv2.findContours(im, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
|
|
|
- contours = sorted(contours, key=cv2.contourArea, reverse=True)
|
|
|
+ self.img_proc.find_border()
|
|
|
|
|
|
- return [_contour2bbox(c, im.shape) for c in contours]
|
|
|
+ self.img_proc.preprocess(
|
|
|
+ equalize=False, sigma=config.preprocess.sigma)
|
|
|
|
|
|
-
|
|
|
- def rescale(self, im: np.ndarray) -> np.ndarray:
|
|
|
-
|
|
|
- H, W = im.shape
|
|
|
- _scale = self.min_size / min(H, W)
|
|
|
- scale = max(self.scale, min(1, _scale))
|
|
|
- size = int(W * scale), int(H * scale)
|
|
|
-
|
|
|
- return cv2.resize(im, dsize=size)
|
|
|
-
|
|
|
-
|
|
|
- def preprocess(self, im: np.ndarray) -> np.ndarray:
|
|
|
- res = filters.gaussian(im, sigma=self.sigma, preserve_range=True)
|
|
|
- return res.astype(im.dtype)
|
|
|
-
|
|
|
-
|
|
|
- def threshold(self, im: np.ndarray) -> np.ndarray:
|
|
|
- block_size_scale = self.block_size_scale
|
|
|
-
|
|
|
- # make block size an odd number
|
|
|
- block_size = min(im.shape) * block_size_scale // 2 * 2 + 1
|
|
|
-
|
|
|
- thresh = filters.threshold_local(im,
|
|
|
- block_size=block_size,
|
|
|
- mode="constant",
|
|
|
+ self.img_proc.binarize(
|
|
|
+ type=BinarizerType.gauss_local,
|
|
|
+ use_masked=True,
|
|
|
+ use_cv2=True,
|
|
|
+ sigma=5.0,
|
|
|
+ window_size=config.threshold.window_size,
|
|
|
+ offset=2.0,
|
|
|
)
|
|
|
|
|
|
- max_value = 255
|
|
|
- bin_im = ((im > thresh) * max_value).astype(np.uint8)
|
|
|
- return max_value - bin_im
|
|
|
-
|
|
|
-
|
|
|
- def postprocess(self, im: np.ndarray) -> np.ndarray:
|
|
|
- kernel_size = self.kernel_size
|
|
|
- iterations = self.dilate_iterations
|
|
|
- kernel = np.ones((kernel_size, kernel_size), dtype=np.uint8)
|
|
|
-
|
|
|
- im = cv2.morphologyEx(im, cv2.MORPH_OPEN, kernel)
|
|
|
- im = cv2.morphologyEx(im, cv2.MORPH_CLOSE, kernel)
|
|
|
-
|
|
|
- if iterations >= 1:
|
|
|
- im = cv2.erode(im, kernel, iterations=iterations)
|
|
|
- im = cv2.dilate(im, kernel, iterations=iterations)
|
|
|
+ self.img_proc.remove_border()
|
|
|
|
|
|
- return im
|
|
|
+ self.img_proc.open_close(
|
|
|
+ kernel_size=config.postprocess.kernel_size,
|
|
|
+ iterations=config.postprocess.dilate_iterations)
|
|
|
|
|
|
+ self.bbox_proc = Pipeline()
|
|
|
+ self.bbox_proc.detect()
|
|
|
+ _, splitter = self.bbox_proc.split_bboxes(
|
|
|
+ preproc=Pipeline(), detector=Pipeline())
|
|
|
+ _, bbox_filter = self.bbox_proc.bbox_filter(
|
|
|
+ score_threshold=0.5,
|
|
|
+ nms_threshold=0.3,
|
|
|
+ enlarge=0.01,
|
|
|
+ )
|
|
|
+ _, scorer = self.bbox_proc.score()
|
|
|
|
|
|
- def postprocess_boxes(self, im: np.ndarray, bboxes: T.List[BBox]):
|
|
|
-
|
|
|
- detections = [Detection(bbox, BBoxInfo()) for bbox in bboxes]
|
|
|
-
|
|
|
- _im = im.astype(np.float64) / 255.
|
|
|
- integral, integral_sq = cv2.integral2(_im)
|
|
|
- # im_mean, im_std, im_n = _im_mean_std(integral, integral_sq)
|
|
|
-
|
|
|
- inds = cv2.dnn.NMSBoxes([[x0, y0, x1-x0, y1-y0] for (x0, y0, x1, y1) in bboxes],
|
|
|
- np.ones(len(bboxes), dtype=np.float32),
|
|
|
- score_threshold=0.99,
|
|
|
- nms_threshold=0.1,
|
|
|
- )
|
|
|
-
|
|
|
- # calculate the BBoxInfos only for the selected and update the detections
|
|
|
- for i in inds.squeeze():
|
|
|
- bbox, _ = detections[i]
|
|
|
- mean, std, n = _im_mean_std(integral, integral_sq, bbox)
|
|
|
- area, ratio = bbox.area, bbox.ratio
|
|
|
- selected = self.is_selected(mean, std, ratio, area)
|
|
|
- info = BBoxInfo(mean, std, area, ratio, selected)
|
|
|
- detections[i] = Detection(bbox, info)
|
|
|
-
|
|
|
- return detections
|
|
|
-
|
|
|
- def is_selected(self, mean: float, std: float, ratio: float, area: float) -> bool:
|
|
|
- # Caution, here are some magic numbers!
|
|
|
- return \
|
|
|
- std >= 5e-2 and \
|
|
|
- ratio >= 2.5e-1 and \
|
|
|
- 4e-4 <= area <= 1/9
|
|
|
-
|
|
|
-def _contour2bbox(contour: np.ndarray, shape: T.Tuple[int, int]) -> BBox:
|
|
|
- """ Gets the maximal extent of a contour and translates it to a bounding box. """
|
|
|
- x0, y0 = contour.min(axis=0)[0].astype(np.int32)
|
|
|
- x1, y1 = contour.max(axis=0)[0].astype(np.int32)
|
|
|
-
|
|
|
- h, w = shape
|
|
|
- return BBox(x0/w, y0/h, x1/w, y1/h)
|
|
|
-
|
|
|
-
|
|
|
-def _im_mean_std(integral: np.ndarray,
|
|
|
- integral_sq: np.ndarray,
|
|
|
- bbox: T.Optional[BBox] = None
|
|
|
- ) -> T.Tuple[float, float, int]:
|
|
|
-
|
|
|
- h, w = integral.shape[0] - 1, integral.shape[1] - 1
|
|
|
-
|
|
|
- if bbox is None:
|
|
|
- arr_sum = integral[-1, -1]
|
|
|
- arr_sum_sq = integral_sq[-1, -1]
|
|
|
- N = h * w
|
|
|
+ self.img_proc.requires_input(splitter.set_image)
|
|
|
+ self.img_proc.requires_input(bbox_filter.set_image)
|
|
|
+ self.img_proc.requires_input(scorer.set_image)
|
|
|
|
|
|
- else:
|
|
|
- x0, y0, x1, y1 = bbox
|
|
|
- x0, x1 = int(x0 * w), int(x1 * w)
|
|
|
- y0, y1 = int(y0 * h), int(y1 * h)
|
|
|
|
|
|
- A, B, C, D = (y0,x0), (y1,x0), (y0,x1), (y1,x1)
|
|
|
- arr_sum = integral[D] + integral[A] - integral[B] - integral[C]
|
|
|
- arr_sum_sq = integral_sq[D] + integral_sq[A] - integral_sq[B] - integral_sq[C]
|
|
|
|
|
|
- N = (x1-x0) * (y1-y0)
|
|
|
+ def __call__(self, im: np.ndarray) -> T.List[BBox]:
|
|
|
|
|
|
- arr_mean = arr_sum / N
|
|
|
- arr_std = np.sqrt((arr_sum_sq - (arr_sum**2) / N) / N)
|
|
|
+ _im = self.img_proc(im)
|
|
|
+ bboxes, labels, scores = self.bbox_proc(_im)
|
|
|
|
|
|
- return arr_mean, arr_std, N
|
|
|
+ return bboxes
|