Session.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. from datetime import datetime, timedelta
  2. import pickle
  3. import random
  4. import subprocess
  5. from warnings import warn
  6. import os
  7. from tqdm import tqdm
  8. import matplotlib.image as mpimg
  9. from skimage import transform, io
  10. from py.FileUtils import list_folders, list_jpegs_recursive, verify_expected_subfolders
  11. from py.ImageUtils import display_images, get_image_date
  12. class Session:
  13. def __init__(self, folder: str):
  14. self.folder = folder
  15. # session name = folder name[33:], the first 33 characters are always the same
  16. self.name = os.path.basename(folder)[33:]
  17. print(f"Session '{self.name}' at folder: {self.folder}")
  18. assert self.name != ""
  19. verify_expected_subfolders(self.folder)
  20. self.scanned = False
  21. # maps lapse files to their exif dates (for statistic and prediction purposes)
  22. self.lapse_dates = {}
  23. # maps motion files to their exif dates (for statistic purposes)
  24. self.motion_dates = {}
  25. # maps exif dates to lapse files (for prediction purposes)
  26. self.lapse_map = {}
  27. self.load_scans()
  28. if not self.scanned:
  29. print("Session not scanned. Run session.scan() to create scan files")
  30. def load_scans(self):
  31. lapse_dates_file = os.path.join("session_scans", self.name, "lapse_dates.pickle")
  32. motion_dates_file = os.path.join("session_scans", self.name, "motion_dates.pickle")
  33. lapse_map_file = os.path.join("session_scans", self.name, "lapse_map.pickle")
  34. lapse_dates_exists = os.path.isfile(lapse_dates_file)
  35. motion_dates_exists = os.path.isfile(motion_dates_file)
  36. lapse_map_exists = os.path.isfile(lapse_map_file)
  37. if lapse_dates_exists and motion_dates_exists and lapse_map_exists:
  38. with open(lapse_dates_file, "rb") as handle:
  39. self.lapse_dates = pickle.load(handle)
  40. with open(motion_dates_file, "rb") as handle:
  41. self.motion_dates = pickle.load(handle)
  42. with open(lapse_map_file, "rb") as handle:
  43. self.lapse_map = pickle.load(handle)
  44. self.scanned = True
  45. print("Loaded scans.")
  46. else:
  47. if not (not lapse_dates_exists and not motion_dates_exists and not lapse_map_exists):
  48. warn(f"Warning: Only partial scan data available. Not loading.")
  49. self.scanned = False
  50. def save_scans(self):
  51. os.makedirs(os.path.join("session_scans", self.name), exist_ok=True)
  52. lapse_dates_file = os.path.join("session_scans", self.name, "lapse_dates.pickle")
  53. motion_dates_file = os.path.join("session_scans", self.name, "motion_dates.pickle")
  54. lapse_map_file = os.path.join("session_scans", self.name, "lapse_map.pickle")
  55. with open(lapse_dates_file, "wb") as handle:
  56. pickle.dump(self.lapse_dates, handle, protocol=pickle.HIGHEST_PROTOCOL)
  57. print(f"Saved {lapse_dates_file}")
  58. with open(motion_dates_file, "wb") as handle:
  59. pickle.dump(self.motion_dates, handle, protocol=pickle.HIGHEST_PROTOCOL)
  60. print(f"Saved {motion_dates_file}")
  61. with open(lapse_map_file, "wb") as handle:
  62. pickle.dump(self.lapse_map, handle, protocol=pickle.HIGHEST_PROTOCOL)
  63. print(f"Saved {lapse_map_file}")
  64. def scan(self, force=False, auto_save=True):
  65. if self.scanned and not force:
  66. raise ValueError("Session is already scanned. Use force=True to scan anyway and override scan progress.")
  67. # Scan motion dates
  68. print("Scanning motion dates...")
  69. self.motion_dates = {}
  70. motion_folder = os.path.join(self.folder, "Motion")
  71. for file in tqdm(list_jpegs_recursive(motion_folder)):
  72. self.motion_dates[os.path.relpath(file, motion_folder)] = get_image_date(file)
  73. # Scan lapse dates
  74. print("Scanning lapse dates...")
  75. self.lapse_dates = {}
  76. lapse_folder = os.path.join(self.folder, "Lapse")
  77. for file in tqdm(list_jpegs_recursive(lapse_folder)):
  78. self.lapse_dates[os.path.relpath(file, lapse_folder)] = get_image_date(file)
  79. # Create lapse map
  80. print("Creating lapse map...")
  81. self.lapse_map = {}
  82. for file, date in self.lapse_dates.items():
  83. if date in self.lapse_map:
  84. self.lapse_map[date].append(file)
  85. else:
  86. self.lapse_map[date] = [file]
  87. self.scanned = True
  88. # Auto save
  89. if auto_save:
  90. print("Saving...")
  91. self.save_scans()
  92. def check_lapse_duplicates(self) -> bool:
  93. total = 0
  94. total_duplicates = 0
  95. total_multiples = 0
  96. deviant_duplicates = []
  97. for date, files in tqdm(self.lapse_map.items()):
  98. total += 1
  99. if len(files) > 1:
  100. total_duplicates += 1
  101. file_size = -1
  102. for f in files:
  103. f_size = os.path.getsize(os.path.join(self.folder, "Lapse", f))
  104. if file_size == -1:
  105. file_size = f_size
  106. elif f_size != file_size:
  107. deviant_duplicates.append(date)
  108. break
  109. if len(files) > 2:
  110. total_multiples += 1
  111. deviant_duplicates.sort()
  112. print(f"* {total} lapse dates")
  113. print(f"* {total_duplicates} duplicates")
  114. print(f"* {total_multiples} multiples (more than two files per date)")
  115. print(f"* {len(deviant_duplicates)} deviant duplicates: {deviant_duplicates}")
  116. return total, total_duplicates, total_multiples, deviant_duplicates
  117. def open_images_for_date(self, date: datetime):
  118. img_names = self.lapse_map.get(date, [])
  119. if len(img_names) == 0:
  120. warn("No images for this date!")
  121. for i, img_name in enumerate(img_names):
  122. full_path = os.path.join(self.folder, "Lapse", img_name)
  123. print(f"#{i+1} {full_path}")
  124. subprocess.call(("xdg-open", full_path))
  125. def get_motion_image_from_filename(self, filename: str) -> "MotionImage":
  126. if filename in self.motion_dates:
  127. return MotionImage(self, filename, self.motion_dates[filename])
  128. else:
  129. raise ValueError(f"Unknown motion file name: {filename}")
  130. def get_random_motion_image(self, day_only=False, night_only=False) -> "MotionImage":
  131. if len(self.motion_dates) == 0:
  132. raise ValueError("No motion images in session!")
  133. img = None
  134. while img is None or (day_only and img.is_nighttime()) or (night_only and img.is_daytime()):
  135. filename = random.choice(list(self.motion_dates.keys()))
  136. img = MotionImage(self, filename, self.motion_dates[filename])
  137. return img
  138. def get_closest_lapse_images(self, motion_file: str):
  139. date: datetime = self.motion_dates[motion_file]
  140. previous_date = date.replace(minute=0, second=0)
  141. next_date = previous_date + timedelta(hours=1)
  142. while not previous_date in self.lapse_map:
  143. previous_date -= timedelta(hours=1)
  144. while not next_date in self.lapse_map:
  145. next_date += timedelta(hours=1)
  146. if len(self.lapse_map[previous_date]) > 1:
  147. warn(f"There are multiple lapse images for date {previous_date}! Choosing the first one.")
  148. if len(self.lapse_map[next_date]) > 1:
  149. warn(f"There are multiple lapse images for date {next_date}! Choosing the first one.")
  150. return LapseImage(self, self.lapse_map[previous_date][0], previous_date), LapseImage(self, self.lapse_map[next_date][0], next_date)
  151. class MotionImage:
  152. def __init__(self, session: Session, filename: str, date: datetime):
  153. self.session = session
  154. self.filename = filename
  155. self.date = date
  156. if not self.filename in session.motion_dates:
  157. raise ValueError(f"File name {filename} not in session!")
  158. if not os.path.isfile(self.get_full_path()):
  159. raise ValueError(f"File Motion/{filename} in session folder {session.folder} not found!")
  160. def get_full_path(self) -> str:
  161. return os.path.join(self.session.folder, "Motion", self.filename)
  162. def open(self):
  163. full_path = self.get_full_path()
  164. print(f"Opening {full_path}...")
  165. subprocess.call(("xdg-open", full_path))
  166. def read(self, truncate_y = (40, 40), scale=1, gray=True):
  167. full_path = self.get_full_path()
  168. img = io.imread(full_path, as_gray=gray)
  169. # truncate
  170. if truncate_y is not None:
  171. if truncate_y[0] > 0 and truncate_y[1] > 0:
  172. img = img[truncate_y[0]:(-truncate_y[1]),:]
  173. elif truncate_y[0] > 0:
  174. img = img[truncate_y[0]:,:]
  175. elif truncate_y[1] > 0:
  176. img = img[:(-truncate_y[1]),:]
  177. # scale
  178. if scale is not None and scale < 1:
  179. img = transform.rescale(img, scale)
  180. return img
  181. def get_closest_lapse_images(self):
  182. before, after = self.session.get_closest_lapse_images(self.filename)
  183. # rel = 0 if motion image was taken at before lapse image, rel = 1 if motion image was taken at after lapse image
  184. rel = (self.date - before.date).total_seconds() / (after.date - before.date).total_seconds()
  185. return before, after, rel
  186. def is_daytime(self):
  187. return 6 <= self.date.hour <= 18
  188. def is_nighttime(self):
  189. return not self.is_daytime()
  190. class LapseImage:
  191. def __init__(self, session: Session, filename: str, date: datetime):
  192. self.session = session
  193. self.filename = filename
  194. self.date = date
  195. if not self.filename in session.lapse_dates:
  196. raise ValueError(f"File name {filename} not in session!")
  197. if not os.path.isfile(self.get_full_path()):
  198. raise ValueError(f"File Lapse/{filename} in session folder {session.folder} not found!")
  199. def get_full_path(self) -> str:
  200. return os.path.join(self.session.folder, "Lapse", self.filename)
  201. def open(self):
  202. full_path = self.get_full_path()
  203. print(f"Opening {full_path}...")
  204. subprocess.call(("xdg-open", full_path))
  205. def read(self, truncate_y = (40, 40), scale=1, gray=True):
  206. full_path = self.get_full_path()
  207. img = io.imread(full_path, as_gray=gray)
  208. # truncate
  209. if truncate_y is not None:
  210. if truncate_y[0] > 0 and truncate_y[1] > 0:
  211. img = img[truncate_y[0]:(-truncate_y[1]),:]
  212. elif truncate_y[0] > 0:
  213. img = img[truncate_y[0]:,:]
  214. elif truncate_y[1] > 0:
  215. img = img[:(-truncate_y[1]),:]
  216. # scale
  217. if scale is not None and scale < 1:
  218. img = transform.rescale(img, scale)
  219. return img