#!/usr/bin/env python """webcam_demo: A live webcam stream with overlayed information.""" import sys import cv2 import numpy as np import tensorflow as tf import collections import os import six.moves.urllib as urllib import tarfile import time from threading import Thread from queue import Queue from object_detection.utils import label_map_util from object_detection.utils import visualization_utils as vis_util # What model to download. #MODEL_NAME = 'ssd_mobilenet_v1_coco_11_06_2017' MODEL_NAME = 'ssd_inception_v2_coco_11_06_2017' #MODEL_NAME = 'faster_rcnn_inception_resnet_v2_atrous_coco_11_06_2017' MODEL_FILE = MODEL_NAME + '.tar.gz' DOWNLOAD_BASE = 'http://download.tensorflow.org/models/object_detection/' # Path to frozen detection graph. This is the actual model that is used for the object detection. PATH_TO_CKPT = MODEL_NAME + '/frozen_inference_graph.pb' # List of the strings that is used to add correct label for each box. PATH_TO_LABELS = os.path.join('data', 'mscoco_label_map.pbtxt') NUM_CLASSES = 90 class FileVideoStream: def __init__(self, device=0, queue_size=2): self.stream = cv2.VideoCapture(device) ret, _ = self.stream.read() if not ret: print('Error: read() returned false. IsOpened: %r' % (self.stream.isOpened())) self.stopped = not ret self.Q = Queue(maxsize=queue_size) def start(self): t = Thread(target=self.update, args=()) t.daemon = True t.start() self.running_thread = t return self def update(self): while True: if self.stopped: return else: ret, frame = self.stream.read() if not ret: self.stop() return if not self.Q.full(): for i in range(1): self.Q.put(frame) else: self.Q.get() self.Q.put(frame) time.sleep(0) def read(self): return self.Q.get() def more(self): return self.Q.qsize() > 0 def stop(self): self.stopped = True def close(self): self.stop() self.running_thread.join() self.stream.release() def main(cam_id): time.time() frames = 0 if not os.path.exists(MODEL_FILE): print('Downloading model...') opener = urllib.request.URLopener() opener.retrieve(DOWNLOAD_BASE + MODEL_FILE, MODEL_FILE) tar_file = tarfile.open(MODEL_FILE) for file in tar_file.getmembers(): file_name = os.path.basename(file.name) if 'frozen_inference_graph.pb' in file_name: tar_file.extract(file, os.getcwd()) detection_graph = tf.Graph() with detection_graph.as_default(): od_graph_def = tf.GraphDef() with tf.gfile.GFile(PATH_TO_CKPT, 'rb') as fid: serialized_graph = fid.read() od_graph_def.ParseFromString(serialized_graph) tf.import_graph_def(od_graph_def, name='') label_map = label_map_util.load_labelmap(PATH_TO_LABELS) categories = label_map_util.convert_label_map_to_categories(label_map, max_num_classes=NUM_CLASSES, use_display_name=True) category_index = label_map_util.create_category_index(categories) def load_image_into_numpy_array(image): if isinstance(image.size, collections.Sequence): (im_width, im_height) = image.size return np.array(image.getdata(), dtype=np.uint8).reshape((im_height, im_width, 3)) else: im_height = image.shape[0] im_width = image.shape[1] return np.array(image).reshape((im_height, im_width, 3)) # For the sake of simplicity we will use only 2 images: # image1.jpg # image2.jpg # If you want to test the code with your images, just add path to the images to the TEST_IMAGE_PATHS. PATH_TO_TEST_IMAGES_DIR = 'test_images' TEST_IMAGE_PATHS = [os.path.join(PATH_TO_TEST_IMAGES_DIR, 'image{}.jpg'.format(i)) for i in range(1, 3)] # Size, in inches, of the output images. IMAGE_SIZE = (12, 8) with detection_graph.as_default(): with tf.Session(graph=detection_graph) as sess: cap = FileVideoStream(cam_id).start() if cap.stopped: print('Error: Stream is not available') quit(-1) # Definite input and output Tensors for detection_graph image_tensor = detection_graph.get_tensor_by_name('image_tensor:0') # Each box represents a part of the image where a particular object was detected. detection_boxes = detection_graph.get_tensor_by_name('detection_boxes:0') # Each score represent how level of confidence for each of the objects. # Score is shown on the result image, together with the class label. detection_scores = detection_graph.get_tensor_by_name('detection_scores:0') detection_classes = detection_graph.get_tensor_by_name('detection_classes:0') num_detections = detection_graph.get_tensor_by_name('num_detections:0') first = time.time() last = first cv2.namedWindow("Webcam Demo", cv2.WND_PROP_FULLSCREEN) cv2.setWindowProperty("Webcam Demo",cv2.WND_PROP_FULLSCREEN,cv2.WINDOW_FULLSCREEN) print("Running...") while True: now = time.time() diff = now - last last = now fps_string = "FPS: %02.1f" % (1.0 / diff) frame = cap.read() rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) small = cv2.resize(rgb, (128, 128)) # pilim = Image.fromarray(rgb) # pilim_small = pilim.resize((128, 128), resample=Image.LANCZOS) # image_np = load_image_into_numpy_array(pilim) image_np_small = load_image_into_numpy_array(rgb) # Expand dimensions since the model expects images to have shape: [1, None, None, 3] image_np_small_expanded = np.expand_dims(image_np_small, axis=0) # Actual detection. if True: (boxes, scores, classes, num) = sess.run( [detection_boxes, detection_scores, detection_classes, num_detections], feed_dict={image_tensor: image_np_small_expanded}) vis_util.visualize_boxes_and_labels_on_image_array( frame, np.squeeze(boxes), np.squeeze(classes).astype(np.int32), np.squeeze(scores), category_index, use_normalized_coordinates=True, line_thickness=8) #bgr = cv2.cvtColor(rgb, cv2.COLOR_RGB2BGR) cv2.putText(frame, fps_string, (0, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, 255) cv2.putText(frame, "Elapsed time: %02.1fs" % (now - first), (0, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.5, 255) cv2.imshow('Webcam Demo', frame) frames += 1 if (cv2.waitKey(1) & 0xFF == ord('q')): # or (now - first) >= 20.0: print("Benchmark done. FPS avg: %02.1f" % (float(frames) / (now - first))) print("Time per frame: %.1f ms" % (1000.0 * (float(now - first) / float(frames)))) print("Elapsed time: %02.1fs" % (now - first)) break cap.close() cv2.destroyAllWindows() if __name__ == "__main__": cam_id = int(sys.argv[1]) if len(sys.argv) > 1 else 0 main(cam_id)