Przeglądaj źródła

Resolve "abstract model interface"

Eric Tröbs 4 lat temu
rodzic
commit
664beb297b

+ 2 - 0
pycs/models/ModelManager.py

@@ -14,5 +14,7 @@ class ModelManager:
             # load distribution.json
             with open(path.join(folder, 'distribution.json'), 'r') as file:
                 model = load(file)
+                model['path'] = folder
+
                 model_id = model['id']
                 app_status['models'][model_id] = model

+ 6 - 7
pycs/pipeline/Pipeline.py

@@ -3,20 +3,19 @@ from pycs.pipeline.Job import Job
 
 
 class Pipeline:
-    def load(self, root: str, distribution: dict):
+    def __init__(self, root_folder, distribution):
         """
-        load and store properties needed for prediction and fitting
+        prepare everything needed to run jobs later
 
-        :param root: path to model root directory containing distribution.json
-        :param distribution: pipeline dict from distribution.json containing pipeline specific configuration
-        :return:
+        :param root_folder: relative path to model folder
+        :param distribution: object parsed from distribution.json
         """
         pass
 
     def close(self):
         """
-        this function is called everytime a pipeline is not needed anymore
-        and should be used to close and clean native resources
+        is called everytime a pipeline is not needed anymore and should be used
+        to close and clean native resources
 
         :return:
         """

+ 16 - 11
pycs/pipeline/PipelineManager.py

@@ -1,23 +1,26 @@
-from os import getcwd
 from os import path
 
 from eventlet import tpool
 
 from pycs.pipeline.Job import Job
-from pycs.pipeline.tf1.pipeline import Pipeline as TF1Pipeline
 from pycs.projects.Project import Project
 
 
 class PipelineManager:
-    def __init__(self, project: Project):
-        self.project = project
-        self.pipeline = tpool.execute(self.__load_pipeline, project['pipeline']['model-distribution'])
+    def __init__(self, project: Project, pipeline):
+        code_path = path.join(pipeline['path'], pipeline['code']['module'])
+        module_name = code_path.replace('/', '.').replace('\\', '.')
+        class_name = pipeline['code']['class']
+
+        mod = __import__(module_name, fromlist=[class_name])
+        cl = getattr(mod, class_name)
 
-    def __enter__(self):
-        return self
+        self.project = project
+        self.pipeline = cl(pipeline['path'], pipeline)
 
-    def __exit__(self, type, value, traceback):
+    def close(self):
         self.pipeline.close()
+        print('PipelineManager', 'close')
 
     def run(self, media_file):
         # create job list
@@ -32,16 +35,18 @@ class PipelineManager:
         for prediction in result.predictions:
             media_file.add_result(prediction, origin='pipeline')
 
+    '''
     def __load_pipeline(self, pipeline_identifier):
         model_distribution = self.project.parent.parent['models'][pipeline_identifier]
 
         if model_distribution['mode'] == 'tf1':
             model_root = path.join(getcwd(), 'models', model_distribution['name'])
 
-            pipeline = TF1Pipeline()
-            pipeline.load(model_root, model_distribution['pipeline'])
+            #pipeline = TF1Pipeline()
+            #pipeline.load(model_root, model_distribution['pipeline'])
 
-            return pipeline
+            #return pipeline
+    '''
 
     '''
     def __update(self, data):

+ 0 - 5
pycs/pipeline/tf1/__init__.py

@@ -1,5 +0,0 @@
-#!/usr/bin/env python
-
-"""pipeline:  Detection and other model pipeline."""
-
-from pycs.pipeline.tf1.pipeline import Pipeline

+ 0 - 5
pycs/pipeline/tf1/detection/__init__.py

@@ -1,5 +0,0 @@
-#!/usr/bin/env python
-
-"""detection:  Face detection implementation."""
-
-from .detector import Detector

+ 0 - 64
pycs/pipeline/tf1/detection/detector.py

@@ -1,64 +0,0 @@
-#!/usr/bin/env python
-
-"""Detector:  Face detection implementation."""
-
-import logging
-
-import numpy as np
-
-from ..utils import TFModel
-
-
-# import tensorflow.contrib.slim as slim
-
-
-class Detector(TFModel):
-    def __init__(self, config):
-        TFModel.__init__(self, config)
-
-        try:
-            # (1) Find feature tensor
-            self.tf_image_tensor = self.tf_graph.get_tensor_by_name("import/image_tensor:0")
-
-            self.tf_detection_boxes = self.tf_graph.get_tensor_by_name('import/detection_boxes:0')
-            self.tf_detection_scores = self.tf_graph.get_tensor_by_name('import/detection_scores:0')
-            self.tf_detection_classes = self.tf_graph.get_tensor_by_name('import/detection_classes:0')
-            self.tf_num_detections = self.tf_graph.get_tensor_by_name('import/num_detections:0')
-
-            self.input_shape = self.tf_image_tensor.shape[1:].as_list()
-            if "downscale-to" in config.keys():
-                for i in range(len(self.input_shape)):
-                    if self.input_shape[i] is None:
-                        self.input_shape[i] = config["downscale-to"]
-            logging.debug("Input shape: %s" % self.input_shape)
-
-        except:
-            self._report_error("Could not access tensors by name")
-
-    def detect_faces(self, image):
-        if None not in self.input_shape:
-            resized_image = image.resize(size=self.input_shape[0:2])
-        else:
-            resized_image = image
-
-        (boxes, scores, classes, num) = self.tf_session.run(
-            [self.tf_detection_boxes, self.tf_detection_scores, self.tf_detection_classes, self.tf_num_detections],
-            feed_dict={self.tf_image_tensor: np.expand_dims(resized_image, axis=0)})
-
-        sample_num = int(num[0])
-        sample_scores = scores[0][0:sample_num]
-        sample_boxes = boxes[0][0:sample_num]
-
-        filtered_boxes = sample_boxes[sample_scores > 0.5]
-        filtered_scores = sample_scores[sample_scores > 0.5]
-
-        ret_boxes = []
-        for index, box in enumerate(filtered_boxes):
-            score = sample_scores[index]
-            ymin, xmin, ymax, xmax = box
-            ret_box = {'x': float(xmin), 'y': float(ymin),
-                       'w': float(xmax - xmin), 'h': float(ymax - ymin),
-                       'score': float(score)}
-            ret_boxes += [ret_box]
-
-        return ret_boxes

+ 0 - 5
pycs/pipeline/tf1/features/__init__.py

@@ -1,5 +0,0 @@
-#!/usr/bin/env python
-
-"""__init__:  Feature extraction."""
-
-from .features import Features

+ 0 - 39
pycs/pipeline/tf1/features/features.py

@@ -1,39 +0,0 @@
-#!/usr/bin/env python
-
-"""features.py:  Feature extraction."""
-
-# import tensorflow.contrib.slim as slim
-import logging
-
-import numpy as np
-import scipy.misc
-import tensorflow as tf
-
-from ..utils import TFModel
-
-
-class Features(TFModel):
-    def __init__(self, config):
-        TFModel.__init__(self, config)
-
-        try:
-            # (1) Find feature tensor
-            feature_tensor = self.tf_graph.get_tensor_by_name(config["features-tensor"])
-            # self.tf_feature_out = slim.flatten(feature_tensor)
-            self.tf_feature_out = tf.keras.layers.Flatten()(feature_tensor)
-
-            # (2) Find input tensor
-            self.tf_input = self.tf_graph.get_tensor_by_name(config["input-tensor"])
-
-            self.input_shape = self.tf_input.shape[1:]
-            logging.debug("Input shape: %s" % self.input_shape)
-
-        except:
-            self._report_error("Could not access tensors by name")
-
-    def extract_features(self, image):
-        resized_image = scipy.misc.imresize(image, self.input_shape)
-        features = self.tf_session.run(self.tf_feature_out,
-                                       feed_dict={self.tf_input: np.expand_dims(resized_image, axis=0)})
-
-        return features

+ 0 - 103
pycs/pipeline/tf1/pipeline.py

@@ -1,103 +0,0 @@
-#!/usr/bin/env python
-
-"""pipeline:  Detection and other model pipeline."""
-from PIL import Image
-
-from pycs.pipeline.Pipeline import Pipeline as PipelineInterface
-from pycs.utils import Video
-from .detection import Detector
-from .features import Features
-from ..Job import Job
-from ..Result import Result
-
-
-class Pipeline(PipelineInterface):
-    def __init__(self):
-        self.__detector = None
-        self.__features = None
-
-    def load(self, root: str, distribution: dict):
-        print('tf1 load')
-        detector_config = distribution['detection']
-        features_config = distribution['features']
-
-        self.__detector = Detector(config={
-            **detector_config,
-            'distribution-root': root
-        })
-        self.__features = Features(config={
-            **features_config,
-            'distribution-root': root
-        })
-
-    def close(self):
-        print('tf1 close')
-        if self.__detector is not None:
-            self.__detector.close()
-        if self.__features is not None:
-            self.__features.close()
-
-    def execute(self, job: Job) -> Result:
-        subjobs = [{
-            'subjob': job.type,
-            'prediction': {},
-            'jobinfo': {},
-            'filetype': 'image',
-            'filename': job.object_full_path
-        }]
-        self.__execute(subjobs, lambda x: 0)
-
-        return Result(job, subjobs[0]['prediction']['faces'])
-
-    def __execute(self, subjobs, callback):
-        callback(0)
-        subjob_count = float(len(subjobs))
-        for index, subjob in enumerate(subjobs):
-            subjob_name = subjob['subjob']
-            prediction = subjob['prediction']
-            jobinfo = subjob['jobinfo']
-            jobinfo[subjob_name] = {'done-by': 'pipeline'}
-
-            if subjob_name == 'detect-faces':
-                # Run face detection
-                if self.__detector.last_error is not None:
-                    jobinfo[subjob_name]['error'] = self.__detector.last_error
-                    jobinfo[subjob_name]['result'] = False
-
-                else:
-                    filename = subjob['filename']
-                    print(filename)
-
-                    # Acquire image
-                    if subjob['filetype'] == 'image':
-                        img = Image.open(filename)
-                    elif subjob['filetype'] == 'video':
-                        if 'cap' in subjob.keys():
-                            cap = subjob['cap']
-                        else:
-                            cap = Video(filename)
-                        if cap.last_error is None:
-                            jobinfo[subjob_name]['error'] = cap.last_error
-                        else:
-                            jobinfo[subjob_name]['result'] = False
-                            continue
-
-                        img = cap.get_frame(subjob['frame'])
-                    else:
-                        jobinfo[subjob_name]['error'] = 'File format not supported!'
-                        jobinfo[subjob_name]['result'] = False
-                        continue
-
-                    faces = self.__detector.detect_faces(img)
-
-                    if self.__detector.last_error is not None:
-                        jobinfo[subjob_name]['error'] = self.__detector.last_error
-                        jobinfo[subjob_name]['result'] = False
-                    else:
-                        prediction['faces'] = faces
-                        jobinfo[subjob_name]['result'] = True
-            else:
-                jobinfo[subjob_name]['result'] = False
-
-            callback(float(index) / subjob_count)
-        callback(1)

+ 0 - 5
pycs/pipeline/tf1/utils/__init__.py

@@ -1,5 +0,0 @@
-#!/usr/bin/env python
-
-"""utils:  Various helper functions."""
-
-from .tfmodel import TFModel

+ 0 - 25
pycs/pipeline/tf1/utils/tfmodel.py

@@ -1,25 +0,0 @@
-#!/usr/bin/env python
-
-"""tfmodel:  Base class for tensorflow based models."""
-
-from pycs.utils import Errorable
-from .utils import create_session_from_config
-
-
-class TFModelException(Exception):
-    def __init__(self, message):
-        Exception.__init__(self, message)
-
-
-class TFModel(Errorable):
-    def __init__(self, config):
-        Errorable.__init__(self)
-
-        self.tf_session = create_session_from_config(config)
-        if self.tf_session is None:
-            self.last_error = 'Session creation failed.'
-
-        self.tf_graph = self.tf_session.graph
-
-    def close(self):
-        self.tf_session.close()

+ 0 - 110
pycs/pipeline/tf1/utils/utils.py

@@ -1,110 +0,0 @@
-#!/usr/bin/env python
-
-"""utils:  Various helper functions."""
-
-import logging
-import os.path
-import sys
-
-import google.protobuf.message
-import tensorflow as tf
-from google.protobuf import text_format
-
-
-def create_session_from_config(config):
-    try:
-        # (1) Frozen graph
-        if "model-frozengraph" in config.keys():
-            full_path = os.path.join(config["distribution-root"], config["model-frozengraph"])
-            logging.debug("Loading frozen graph: %s" % full_path)
-            graph = tf.Graph()
-            with graph.as_default():
-                graph_definition = tf.compat.v1.GraphDef()
-                with tf.io.gfile.GFile(full_path, 'rb') as graph_file:
-                    logging.debug("Opened file, deserializing...")
-                    serialized_graph = graph_file.read()
-                    graph_definition.ParseFromString(serialized_graph)
-                    tf.import_graph_def(graph_definition)
-
-            logging.debug("Done, creating session.")
-            return _create_tf_session(graph=graph)
-
-        # (2) GraphDef + Checkpoint
-        elif "model-graphdef" in config.keys():
-            full_path_def = os.path.join(config["distribution-root"], config["model-graphdef"])
-            full_path_ckpt = os.path.join(config["distribution-root"], config["model-checkpoint"])
-            logging.debug("Loading graphdef: %s" % full_path_def)
-            graph = tf.Graph()
-            with graph.as_default():
-                graph_definition = tf.compat.v1.GraphDef()
-                if full_path_def.endswith('pb'):
-                    with tf.io.gfile.GFile(full_path_def, 'rb') as graph_file:
-                        logging.debug("Opened file, deserializing...")
-                        serialized_graph = graph_file.read()
-                        graph_definition.ParseFromString(serialized_graph)
-                elif full_path_def.endswith('graph') or full_path_def.endswith('pbtxt'):
-                    with tf.io.gfile.GFile(full_path_def, 'r') as graph_file:
-                        logging.debug("Opened file, deserializing...")
-                        text_format.Merge(graph_file.read(), graph_definition)
-                else:
-                    raise Exception("Unknown file type: %s" % full_path_def)
-
-                tf.import_graph_def(graph_definition)
-
-            logging.debug("Done, creating session.")
-            session = _create_tf_session(graph=graph)
-
-            logging.debug('Restoring checkpoint %s' % full_path_ckpt)
-            saver = tf.compat.v1.train.Saver()
-            saver.restore(sess=session, save_path=full_path_ckpt)
-
-            return session
-
-        # (3) Metagraph
-        elif "model-metagraph" in config.keys():
-            full_path_meta = os.path.join(config["distribution-root"], config["model-metagraph"])
-            full_path_ckpt = os.path.join(config["distribution-root"], config["model-checkpoint"])
-
-            logging.debug('Importing metagraph, creating session...')
-            session = _create_tf_session()
-
-            logging.debug('Loading %s' % full_path_meta)
-            saver = tf.compat.v1.train.import_meta_graph(full_path_meta, clear_devices=True, import_scope="import")
-
-            logging.debug('Restoring checkpoint %s' % full_path_ckpt)
-            saver.restore(sess=session, save_path=full_path_ckpt)
-
-            return session
-    except OSError as os_error:
-        logging.error("Error while attempting to load: %s" % os_error)
-        logging.error("Config: %s" % config)
-    except google.protobuf.message.DecodeError as decode_error:
-        logging.error("Error while attempting to load: %s" % decode_error)
-        logging.error("Config: %s" % config)
-    except:
-        t = sys.exc_info()[0]
-        v = sys.exc_info()[1]
-        logging.error("Error while attempting to load: %s, %s" % (t, v))
-        logging.error("Config: %s" % config)
-
-    return None
-
-
-def _create_tf_session(graph=None):
-    # tf_config = tf.compat.v1.ConfigProto()
-    # tf_config.gpu_options.allow_growth = True
-    gpu_devices = tf.config.experimental.list_physical_devices('GPU')
-    for device in gpu_devices:
-        tf.config.experimental.set_memory_growth(device, True)
-
-    with graph.as_default():
-        print(graph)
-        # TODO convert graph ?
-        pass
-
-    if graph is not None:
-        session = tf.compat.v1.Session(graph=graph)
-    else:
-        session = tf.compat.v1.Session()
-
-    return session

+ 38 - 7
pycs/projects/ProjectManager.py

@@ -5,6 +5,8 @@ from shutil import rmtree
 from time import time
 from uuid import uuid1
 
+from eventlet import spawn_after
+
 from pycs import ApplicationStatus
 from pycs.observable import ObservableDict
 from pycs.pipeline.PipelineManager import PipelineManager
@@ -12,7 +14,12 @@ from pycs.projects.Project import Project
 
 
 class ProjectManager(ObservableDict):
+    DEFAULT_PIPELINE_TIMEOUT = 120
+
     def __init__(self, app_status: ApplicationStatus):
+        self.pipeline_manager = None
+        self.quit_pipeline_thread = None
+
         # TODO create projects folder if it does not exist
         self.app_status = app_status
 
@@ -86,10 +93,34 @@ class ProjectManager(ObservableDict):
 
         project = self[uuid]
 
-        # load pipeline
-        with PipelineManager(project) as pm:
-            # TODO add jobs to list
-            # run predictions
-            for file_id in identifiers:
-                if file_id in project['data'].keys():
-                    pm.run(project['data'][file_id])
+        # abort pipeline termination
+        if self.quit_pipeline_thread is not None:
+            self.quit_pipeline_thread.cancel()
+            self.quit_pipeline_thread = None
+
+        # create pipeline if it does not exist already
+        if self.pipeline_manager is None:
+            # abort if pipeline id is no valid key
+            pipeline_identifier = project['pipeline']['model-distribution']
+            pipeline = self.parent['models'][pipeline_identifier]
+
+            self.pipeline_manager = PipelineManager(project, pipeline)
+
+        # run jobs
+        for file_id in identifiers:
+            if file_id in project['data'].keys():
+                self.pipeline_manager.run(project['data'][file_id])
+
+        # quit timeout thread
+        if self.quit_pipeline_thread is not None:
+            self.quit_pipeline_thread.cancel()
+            self.quit_pipeline_thread = None
+
+        # schedule timeout thread
+        self.quit_pipeline_thread = spawn_after(self.DEFAULT_PIPELINE_TIMEOUT, self.__quit_pipeline)
+
+    def __quit_pipeline(self):
+        if self.pipeline_manager is not None:
+            self.pipeline_manager.close()
+            self.pipeline_manager = None
+            self.quit_pipeline_thread = None