123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- import cv2
- import uuid
- import numpy as np
- import typing as T
- from flask import abort
- from flask import make_response
- from flask import request
- from flask.views import View
- from pycs import db
- from pycs.database.File import File
- from pycs.database.Result import Result
- from pycs.frontend.notifications.NotificationManager import NotificationManager
- from pycs.jobs.JobGroupBusyException import JobGroupBusyException
- from pycs.jobs.JobRunner import JobRunner
- class EstimateBoundingBox(View):
- """
- create a result for a file
- """
- # pylint: disable=arguments-differ
- methods = ['POST']
- def __init__(self, nm: NotificationManager, jobs: JobRunner,):
- # pylint: disable=invalid-name
- self.nm = nm
- self.jobs = jobs
- def dispatch_request(self, file_id: int):
- file = File.get_or_404(file_id)
- request_data = request.get_json(force=True)
- if 'x' not in request_data or 'y' not in request_data:
- abort(400, "coordinates for the estimation are missing")
- x,y = map(request_data.get, "xy")
- # get project
- project = file.project
- try:
- rnd = str(uuid.uuid4())[:10]
- self.jobs.run(project,
- "Estimation",
- f'{project.name} (create predictions)',
- f"{project.id}/estimation/{rnd}",
- estimate,
- file.id, x, y,
- result=self.nm.create_result
- )
- except JobGroupBusyException:
- abort(400, "Job is already running!")
- return make_response()
- def estimate(file_id: int, x: float, y: float) -> Result:
- file = File.query.get(file_id)
- im = cv2.imread(file.absolute_path, cv2.IMREAD_GRAYSCALE)
- h, w = im.shape
- pos = int(x * w), int(y * h)
- x0, y0, x1, y1 = detect(im, pos,
- window_size=1000,
- pixel_delta=50,
- enlarge=1e-2,
- )
- data = dict(
- x=x0 / w,
- y=y0 / h,
- w=(x1-x0) / w,
- h=(y1-y0) / h
- )
- return file.create_result('pipeline', 'bounding-box', label=None, data=data)
- def detect(im: np.ndarray,
- pos: T.Tuple[int, int],
- window_size: int = 1000,
- pixel_delta: int = 0,
- enlarge: float = -1) -> T.Tuple[int, int, int, int]:
- # im = blur(im, 3)
- x, y = pos
- pixel = im[y, x]
- min_pix, max_pix = pixel - pixel_delta, pixel + pixel_delta
- mask = np.logical_and(min_pix < im, im < max_pix).astype(np.float32)
- # mask = open_close(mask)
- # mask = blur(mask)
- pad = window_size // 2
- mask = np.pad(mask, pad, mode="constant")
- window = mask[y: y + window_size, x: x + window_size]
- sum_x, sum_y = window.sum(axis=0), window.sum(axis=1)
- enlarge = int(enlarge * max(im.shape))
- (x0, x1), (y0, y1) = get_borders(sum_x, enlarge), get_borders(sum_y, enlarge)
- x0 = max(x + x0 - pad, 0)
- y0 = max(y + y0 - pad, 0)
- x1 = min(x + x1 - pad, im.shape[1])
- y1 = min(y + y1 - pad, im.shape[0])
- return x0, y0, x1, y1
- def get_borders(arr, enlarge: int, eps=5e-1):
- mid = len(arr) // 2
- arr0, arr1 = arr[:mid], arr[mid:]
- thresh = arr[mid] * eps
- lowers = np.where(arr0 < thresh)[0]
- lower = 0 if len(lowers) == 0 else lowers[-1]
- uppers = np.where(arr1 < thresh)[0]
- upper = arr1.argmin() if len(uppers) == 0 else uppers[0]
- # since the second half starts after the first
- upper = len(arr0) + upper
- if enlarge > 0:
- lower = max(lower - enlarge, 0)
- upper = min(upper + enlarge, len(arr)-1)
- return int(lower), int(upper)
- """
- def blur(im, sigma=5):
- from skimage import filters
- return filters.gaussian(im, sigma=sigma, preserve_range=True)
- def open_close(im, kernel_size=3):
- kernel = np.ones((kernel_size, kernel_size), dtype=np.uint8)
- im = cv2.morphologyEx(im, cv2.MORPH_OPEN, kernel)
- im = cv2.morphologyEx(im, cv2.MORPH_CLOSE, kernel)
- return im
- """
|