123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- #!/usr/bin/env python
- """pipeline: Detection and other model pipeline."""
- from PIL import Image
- from pycs.pipeline.Pipeline import Pipeline as PipelineInterface
- from pycs.utils import Video
- from .detection import Detector
- from .features import Features
- from ..Job import Job
- from ..Result import Result
- class Pipeline(PipelineInterface):
- def __init__(self):
- self.__detector = None
- self.__features = None
- def load(self, root: str, distribution: dict):
- print('tf1 load')
- detector_config = distribution['detection']
- features_config = distribution['features']
- self.__detector = Detector(config={
- **detector_config,
- 'distribution-root': root
- })
- self.__features = Features(config={
- **features_config,
- 'distribution-root': root
- })
- def close(self):
- print('tf1 close')
- if self.__detector is not None:
- self.__detector.close()
- if self.__features is not None:
- self.__features.close()
- def execute(self, job: Job) -> Result:
- subjobs = [{
- 'subjob': job.type,
- 'prediction': {},
- 'jobinfo': {},
- 'filetype': 'image',
- 'filename': job.object_full_path
- }]
- self.__execute(subjobs, lambda x: 0)
- return Result(job, subjobs[0]['prediction']['faces'])
- def __execute(self, subjobs, callback):
- callback(0)
- subjob_count = float(len(subjobs))
- for index, subjob in enumerate(subjobs):
- subjob_name = subjob['subjob']
- prediction = subjob['prediction']
- jobinfo = subjob['jobinfo']
- jobinfo[subjob_name] = {'done-by': 'pipeline'}
- if subjob_name == 'detect-faces':
- # Run face detection
- if self.__detector.last_error is not None:
- jobinfo[subjob_name]['error'] = self.__detector.last_error
- jobinfo[subjob_name]['result'] = False
- else:
- filename = subjob['filename']
- print(filename)
- # Acquire image
- if subjob['filetype'] == 'image':
- img = Image.open(filename)
- elif subjob['filetype'] == 'video':
- if 'cap' in subjob.keys():
- cap = subjob['cap']
- else:
- cap = Video(filename)
- if cap.last_error is None:
- jobinfo[subjob_name]['error'] = cap.last_error
- else:
- jobinfo[subjob_name]['result'] = False
- continue
- img = cap.get_frame(subjob['frame'])
- else:
- jobinfo[subjob_name]['error'] = 'File format not supported!'
- jobinfo[subjob_name]['result'] = False
- continue
- faces = self.__detector.detect_faces(img)
- if self.__detector.last_error is not None:
- jobinfo[subjob_name]['error'] = self.__detector.last_error
- jobinfo[subjob_name]['result'] = False
- else:
- prediction['faces'] = faces
- jobinfo[subjob_name]['result'] = True
- else:
- jobinfo[subjob_name]['result'] = False
- callback(float(index) / subjob_count)
- callback(1)
|