123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- #!/usr/bin/env python
- """pipeline: Detection and other model pipeline."""
- import json
- import os.path
- from PIL import Image
- from pycs.utils import Errorable
- from pycs.utils import Video
- from .detection import Detector
- from .features import Features
- class Pipeline(Errorable):
- def __init__(self, config):
- Errorable.__init__(self)
- self.config = config
- self.detector, self.features = self._load_distribution()
- self._err_children += [self.detector, self.features]
- def _load_distribution(self):
- try:
- distribution_path = self.config['model-distribution']
- with open(os.path.join(distribution_path, 'distribution.json'), 'r') as distribution_json_file:
- distribution_json = json.load(distribution_json_file)
- detector_config = distribution_json['detection']
- features_config = distribution_json['features']
- except:
- self._report_error("Could not parse the distribution configuration")
- # TODO nothing is returned if no exception occurs
- return None, None
- try:
- detector = self._load_detector(detector_config)
- except:
- self._report_error("Could not load the detector")
- return None, None
- try:
- features = self._load_features(features_config)
- except:
- # TODO detector should not be closed manually
- detector.close()
- self._report_error("Could not load the feature extraction mechanism")
- return None, None
- return detector, features
- def _load_detector(self, config):
- detector = Detector(config={**config, 'distribution-root': self.config['model-distribution']})
- return detector
- def _load_features(self, config):
- features = Features(config={**config, 'distribution-root': self.config['model-distribution']})
- return features
- def execute(self, subjobs, callback):
- callback(0)
- subjob_count = float(len(subjobs))
- for index, subjob in enumerate(subjobs):
- subjob_name = subjob['subjob']
- prediction = subjob['prediction']
- jobinfo = subjob['jobinfo']
- jobinfo[subjob_name] = {'done-by': 'pipeline'}
- if subjob_name == 'detect-faces':
- # Run face detection
- if self.detector.last_error is not None:
- jobinfo[subjob_name]['error'] = self.detector.last_error
- jobinfo[subjob_name]['result'] = False
- else:
- filename = subjob['filename']
- # Acquire image
- if subjob['filetype'] == 'image':
- img = Image.open(filename)
- elif subjob['filetype'] == 'video':
- if 'cap' in subjob.keys():
- cap = subjob['cap']
- else:
- cap = Video(filename)
- if cap.last_error is None:
- jobinfo[subjob_name]['error'] = cap.last_error
- else:
- jobinfo[subjob_name]['result'] = False
- continue
- img = cap.get_frame(subjob['frame'])
- else:
- jobinfo[subjob_name]['error'] = 'File format not supported!'
- jobinfo[subjob_name]['result'] = False
- continue
- faces = self.detector.detect_faces(img)
- if self.detector.last_error is not None:
- jobinfo[subjob_name]['error'] = self.detector.last_error
- jobinfo[subjob_name]['result'] = False
- else:
- prediction['faces'] = faces
- jobinfo[subjob_name]['result'] = True
- else:
- jobinfo[subjob_name]['result'] = False
- callback(float(index) / subjob_count)
- callback(1)
- def close(self):
- self.detector.close()
- self.features.close()
|