Eric Tröbs 4 年之前
父節點
當前提交
03aa57c592

+ 5 - 0
app.py

@@ -1,6 +1,7 @@
 from pycs.ApplicationStatus import ApplicationStatus
 from pycs.frontend.WebServer import WebServer
 from pycs.models.ModelManager import ModelManager
+from pycs.pipeline.PipelineManager import PipelineManager
 from pycs.projects.ProjectManager import ProjectManager
 
 if __name__ == '__main__':
@@ -12,6 +13,10 @@ if __name__ == '__main__':
     print('- load model manager')
     model_manager = ModelManager(app_status)
 
+    # load pipeline manager
+    print('- load pipeline manager')
+    pipeline_manager = PipelineManager(app_status)
+
     # load project manager
     print('- load project manager')
     project_manager = ProjectManager(app_status)

+ 1 - 1
pycs/observable/ObservableList.py

@@ -7,7 +7,7 @@ class ObservableList(list, Observable):
         Observable.__init__(self, parent)
 
         for element in lst:
-            self.append(Observable.create(element, self))
+            list.append(self, Observable.create(element, self))
 
     def __getitem__(self, value):
         return super().__getitem__(int(value))

+ 15 - 0
pycs/pipeline/Job.py

@@ -0,0 +1,15 @@
+from os import path, getcwd
+from uuid import uuid1
+
+
+class Job:
+    def __init__(self, type: str, project_id: str, data: dict):
+        self.id = uuid1()
+        self.type = type
+        self.object_id = data['id']
+        self.object_full_path = path.join(getcwd(),
+                                          'projects',
+                                          project_id,
+                                          'data',
+                                          data['id'] + data['extension'])
+        self.size = data['size']

+ 32 - 0
pycs/pipeline/Pipeline.py

@@ -0,0 +1,32 @@
+from pycs.pipeline import Result
+from pycs.pipeline.Job import Job
+
+
+class Pipeline:
+    def load(self, root: str, distribution: dict):
+        """
+        load and store properties needed for prediction and fitting
+
+        :param root: path to model root directory containing distribution.json
+        :param distribution: pipeline dict from distribution.json containing pipeline specific configuration
+        :return:
+        """
+        pass
+
+    def close(self):
+        """
+        this function is called everytime a pipeline is not needed anymore
+        and should be used to close and clean native resources
+
+        :return:
+        """
+        pass
+
+    def execute(self, job: Job) -> Result:
+        """
+        receive a job, execute it and return the predicted result
+
+        :param job: that should be executed
+        :return:
+        """
+        pass

+ 52 - 0
pycs/pipeline/PipelineManager.py

@@ -0,0 +1,52 @@
+from os import getcwd
+from os import path
+
+from eventlet import tpool
+
+from pycs.ApplicationStatus import ApplicationStatus
+from pycs.pipeline.Job import Job
+from pycs.pipeline.tf1.pipeline import Pipeline as TF1Pipeline
+
+
+class PipelineManager:
+    def __init__(self, app_status: ApplicationStatus):
+        self.app_status = app_status
+        app_status['projects'].subscribe(self.__update)
+
+    def __update(self, data):
+        # get current project path
+        opened_projects = list(filter(lambda x: x['status'] == 'open', data))
+        if len(opened_projects) == 0:
+            return
+
+        current_project = opened_projects[0]
+
+        # find images to predict
+        if 'data' not in current_project.keys() or len(current_project['data']) == 0:
+            return
+
+        # load pipeline
+        pipeline = tpool.execute(self.__load_pipeline, current_project['pipeline']['model-distribution'])
+
+        # create job list
+        for d in current_project['data']:
+            print('keys:', d.keys())
+            if 'result' not in d.keys():
+                # TODO update job progress
+                job = Job('detect-faces', current_project['id'], d)
+                result = tpool.execute(lambda p, j: p.execute(j), pipeline, job)
+                d['result'] = result.predictions
+
+        # close pipeline
+        pipeline.close()
+
+    def __load_pipeline(self, identifier):
+        model_distribution = self.app_status['models'][identifier]
+
+        if model_distribution['mode'] == 'tf1':
+            model_root = path.join(getcwd(), 'models', model_distribution['name'])
+
+            pipeline = TF1Pipeline()
+            pipeline.load(model_root, model_distribution['pipeline'])
+
+            return pipeline

+ 7 - 0
pycs/pipeline/Result.py

@@ -0,0 +1,7 @@
+from pycs.pipeline.Job import Job
+
+
+class Result:
+    def __init__(self, job: Job, predictions: list):
+        self.job = job
+        self.predictions = predictions

+ 45 - 60
pycs/pipeline/tf1/pipeline.py

@@ -1,67 +1,55 @@
 #!/usr/bin/env python
 
 """pipeline:  Detection and other model pipeline."""
-
-import json
-import os.path
-
 from PIL import Image
 
-from pycs.utils import Errorable
+from pycs.pipeline.Pipeline import Pipeline as PipelineInterface
 from pycs.utils import Video
 from .detection import Detector
 from .features import Features
+from ..Job import Job
+from ..Result import Result
 
 
-class Pipeline(Errorable):
-    def __init__(self, config):
-        Errorable.__init__(self)
-        self.config = config
-
-        self.detector, self.features = self._load_distribution()
-        self._err_children += [self.detector, self.features]
-
-    def _load_distribution(self):
-        try:
-            distribution_path = self.config['model-distribution']
-            with open(os.path.join(distribution_path, 'distribution.json'), 'r') as distribution_json_file:
-                distribution_json = json.load(distribution_json_file)
-
-            detector_config = distribution_json['detection']
-            features_config = distribution_json['features']
-
-        except:
-            self._report_error("Could not parse the distribution configuration")
-            # TODO nothing is returned if no exception occurs
-            return None, None
-
-        try:
-            detector = self._load_detector(detector_config)
-        except:
-            self._report_error("Could not load the detector")
-            return None, None
+class Pipeline(PipelineInterface):
+    def __init__(self):
+        self.__detector = None
+        self.__features = None
 
-        try:
-            features = self._load_features(features_config)
-        except:
-            # TODO detector should not be closed manually
-            detector.close()
-            self._report_error("Could not load the feature extraction mechanism")
-            return None, None
+    def load(self, root: str, distribution: dict):
+        print('tf1 load')
+        detector_config = distribution['detection']
+        features_config = distribution['features']
 
-        return detector, features
+        self.__detector = Detector(config={
+            **detector_config,
+            'distribution-root': root
+        })
+        self.__features = Features(config={
+            **features_config,
+            'distribution-root': root
+        })
 
-    def _load_detector(self, config):
-        detector = Detector(config={**config, 'distribution-root': self.config['model-distribution']})
-
-        return detector
-
-    def _load_features(self, config):
-        features = Features(config={**config, 'distribution-root': self.config['model-distribution']})
-
-        return features
-
-    def execute(self, subjobs, callback):
+    def close(self):
+        print('tf1 close')
+        if self.__detector is not None:
+            self.__detector.close()
+        if self.__features is not None:
+            self.__features.close()
+
+    def execute(self, job: Job) -> Result:
+        subjobs = [{
+            'subjob': job.type,
+            'prediction': {},
+            'jobinfo': {},
+            'filetype': 'image',
+            'filename': job.object_full_path
+        }]
+        self.__execute(subjobs, lambda x: 0)
+
+        return Result(job, subjobs[0]['prediction']['faces'])
+
+    def __execute(self, subjobs, callback):
         callback(0)
         subjob_count = float(len(subjobs))
         for index, subjob in enumerate(subjobs):
@@ -72,12 +60,13 @@ class Pipeline(Errorable):
 
             if subjob_name == 'detect-faces':
                 # Run face detection
-                if self.detector.last_error is not None:
-                    jobinfo[subjob_name]['error'] = self.detector.last_error
+                if self.__detector.last_error is not None:
+                    jobinfo[subjob_name]['error'] = self.__detector.last_error
                     jobinfo[subjob_name]['result'] = False
 
                 else:
                     filename = subjob['filename']
+                    print(filename)
 
                     # Acquire image
                     if subjob['filetype'] == 'image':
@@ -99,10 +88,10 @@ class Pipeline(Errorable):
                         jobinfo[subjob_name]['result'] = False
                         continue
 
-                    faces = self.detector.detect_faces(img)
+                    faces = self.__detector.detect_faces(img)
 
-                    if self.detector.last_error is not None:
-                        jobinfo[subjob_name]['error'] = self.detector.last_error
+                    if self.__detector.last_error is not None:
+                        jobinfo[subjob_name]['error'] = self.__detector.last_error
                         jobinfo[subjob_name]['result'] = False
                     else:
                         prediction['faces'] = faces
@@ -112,7 +101,3 @@ class Pipeline(Errorable):
 
             callback(float(index) / subjob_count)
         callback(1)
-
-    def close(self):
-        self.detector.close()
-        self.features.close()

+ 0 - 14
pycs/pipeline/tf1/utils/utils.py

@@ -13,20 +13,6 @@ from google.protobuf import text_format
 
 def create_session_from_config(config):
     try:
-        # (0) Saved Model
-        if 'saved_model' in config.keys():
-            full_path = os.path.join(config["distribution-root"], config["saved_model"])
-            logging.debug(f'Loading saved model {full_path}')
-
-            # model = tf.keras.models.load_model(full_path)
-            # print(full_path)
-            # model = tf.saved_model.load(full_path)
-            # tf.saved_model.save(model, full_path + 'blyat/')
-
-            # print('    <!--')
-            # print(model)
-            # print('    -->')
-
         # (1) Frozen graph
         if "model-frozengraph" in config.keys():
             full_path = os.path.join(config["distribution-root"], config["model-frozengraph"])