#!/usr/bin/env python """pipeline: Detection and other model pipeline.""" import json import os.path from PIL import Image from pycs.utils import Errorable from pycs.utils import Video from .detection import Detector from .features import Features class Pipeline(Errorable): def __init__(self, config): Errorable.__init__(self) self.config = config self.detector, self.features = self._load_distribution() self._err_children += [self.detector, self.features] def _load_distribution(self): try: distribution_path = self.config['model-distribution'] with open(os.path.join(distribution_path, 'distribution.json'), 'r') as distribution_json_file: distribution_json = json.load(distribution_json_file) detector_config = distribution_json['detection'] features_config = distribution_json['features'] except: self._report_error("Could not parse the distribution configuration") # TODO nothing is returned if no exception occurs return None, None try: detector = self._load_detector(detector_config) except: self._report_error("Could not load the detector") return None, None try: features = self._load_features(features_config) except: # TODO detector should not be closed manually detector.close() self._report_error("Could not load the feature extraction mechanism") return None, None return detector, features def _load_detector(self, config): detector = Detector(config={**config, 'distribution-root': self.config['model-distribution']}) return detector def _load_features(self, config): features = Features(config={**config, 'distribution-root': self.config['model-distribution']}) return features def execute(self, subjobs, callback): callback(0) subjob_count = float(len(subjobs)) for index, subjob in enumerate(subjobs): subjob_name = subjob['subjob'] prediction = subjob['prediction'] jobinfo = subjob['jobinfo'] jobinfo[subjob_name] = {'done-by': 'pipeline'} if subjob_name == 'detect-faces': # Run face detection if self.detector.last_error is not None: jobinfo[subjob_name]['error'] = self.detector.last_error jobinfo[subjob_name]['result'] = False else: filename = subjob['filename'] # Acquire image if subjob['filetype'] == 'image': img = Image.open(filename) elif subjob['filetype'] == 'video': if 'cap' in subjob.keys(): cap = subjob['cap'] else: cap = Video(filename) if cap.last_error is None: jobinfo[subjob_name]['error'] = cap.last_error else: jobinfo[subjob_name]['result'] = False continue img = cap.get_frame(subjob['frame']) else: jobinfo[subjob_name]['error'] = 'File format not supported!' jobinfo[subjob_name]['result'] = False continue faces = self.detector.detect_faces(img) if self.detector.last_error is not None: jobinfo[subjob_name]['error'] = self.detector.last_error jobinfo[subjob_name]['result'] = False else: prediction['faces'] = faces jobinfo[subjob_name]['result'] = True else: jobinfo[subjob_name]['result'] = False callback(float(index) / subjob_count) callback(1) def close(self): self.detector.close() self.features.close()