from typing import List from pycs.pipeline.Fit import Fit from pycs.pipeline.Job import Job class Pipeline: def __init__(self, root_folder, distribution): """ prepare everything needed to run jobs later :param root_folder: relative path to model folder :param distribution: object parsed from distribution.json """ raise NotImplementedError def close(self): """ is called everytime a pipeline is not needed anymore and should be used to free native resources :return: """ raise NotImplementedError def execute(self, job: Job) -> List[dict]: """ receive a job, execute it and return the predicted result :param job: that should be executed :return: """ raise NotImplementedError # TODO documentation def fit(self, fit: List[Fit]): raise NotImplementedError # TODO documentation @staticmethod def create_labeled_image_result(label): return { 'type': 'labeled-image', 'label': label } # TODO documentation @staticmethod def create_bounding_box_result(x, y, w, h): return { 'type': 'bounding-box', 'x': x, 'y': y, 'w': w, 'h': h } # TODO documentation @staticmethod def create_labeled_bounding_box_result(x, y, w, h, label): return { 'type': 'labeled-bounding-box', 'x': x, 'y': y, 'w': w, 'h': h, 'label': label }