disk_usage.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. ################################################################################
  2. # Copyright (c) 2021 ContinualAI. #
  3. # Copyrights licensed under the MIT License. #
  4. # See the accompanying LICENSE file for terms. #
  5. # #
  6. # Date: 19-01-2021 #
  7. # Author(s): Lorenzo Pellegrini #
  8. # E-mail: contact@continualai.org #
  9. # Website: www.continualai.org #
  10. ################################################################################
  11. import os
  12. from pathlib import Path
  13. from typing import Union, Sequence, List, Optional
  14. from avalanche.evaluation import Metric, PluginMetric, GenericPluginMetric
  15. PathAlike = Union[Union[str, Path]]
  16. class DiskUsage(Metric[float]):
  17. """
  18. The standalone disk usage metric.
  19. This metric can be used to monitor the size of a set of directories.
  20. e.g. This can be useful to monitor the size of a replay buffer,
  21. """
  22. def __init__(self,
  23. paths_to_monitor: Union[PathAlike, Sequence[PathAlike]] = None
  24. ):
  25. """
  26. Creates an instance of the standalone disk usage metric.
  27. The `result` method will return the sum of the size
  28. of the directories specified as the first parameter in KiloBytes.
  29. :param paths_to_monitor: a path or a list of paths to monitor. If None,
  30. the current working directory is used. Defaults to None.
  31. """
  32. if paths_to_monitor is None:
  33. paths_to_monitor = [os.getcwd()]
  34. if isinstance(paths_to_monitor, (str, Path)):
  35. paths_to_monitor = [paths_to_monitor]
  36. self._paths_to_monitor: List[str] = [str(p) for p in paths_to_monitor]
  37. self.total_usage = 0
  38. def update(self):
  39. """
  40. Updates the disk usage statistics.
  41. :return None.
  42. """
  43. dirs_size = 0
  44. for directory in self._paths_to_monitor:
  45. dirs_size += DiskUsage.get_dir_size(directory)
  46. self.total_usage = dirs_size
  47. def result(self) -> Optional[float]:
  48. """
  49. Retrieves the disk usage as computed during the last call to the
  50. `update` method.
  51. Calling this method will not change the internal state of the metric.
  52. :return: The disk usage or None if `update` was not invoked yet.
  53. """
  54. return self.total_usage
  55. def reset(self) -> None:
  56. """
  57. Resets the metric.
  58. :return: None.
  59. """
  60. self.total_usage = 0
  61. @staticmethod
  62. def get_dir_size(path: str):
  63. total_size = 0
  64. for dirpath, dirnames, filenames in os.walk(path):
  65. for f in filenames:
  66. fp = os.path.join(dirpath, f)
  67. # skip if it is symbolic link
  68. if not os.path.islink(fp):
  69. # in KB
  70. s = os.path.getsize(fp) / 1024
  71. total_size += s
  72. return total_size
  73. class DiskPluginMetric(GenericPluginMetric[float]):
  74. def __init__(self, paths, reset_at, emit_at, mode):
  75. self._disk = DiskUsage(paths_to_monitor=paths)
  76. super(DiskPluginMetric, self).__init__(
  77. self._disk, reset_at=reset_at, emit_at=emit_at,
  78. mode=mode)
  79. def update(self, strategy):
  80. self._disk.update()
  81. class MinibatchDiskUsage(DiskPluginMetric):
  82. """
  83. The minibatch Disk usage metric.
  84. This plugin metric only works at training time.
  85. At the end of each iteration, this metric logs the total
  86. size (in KB) of all the monitored paths.
  87. If a more coarse-grained logging is needed, consider using
  88. :class:`EpochDiskUsage`.
  89. """
  90. def __init__(self, paths_to_monitor):
  91. """
  92. Creates an instance of the minibatch Disk usage metric.
  93. """
  94. super(MinibatchDiskUsage, self).__init__(
  95. paths_to_monitor,
  96. reset_at='iteration', emit_at='iteration', mode='train')
  97. def __str__(self):
  98. return "DiskUsage_MB"
  99. class EpochDiskUsage(DiskPluginMetric):
  100. """
  101. The Epoch Disk usage metric.
  102. This plugin metric only works at training time.
  103. At the end of each epoch, this metric logs the total
  104. size (in KB) of all the monitored paths.
  105. """
  106. def __init__(self, paths_to_monitor):
  107. """
  108. Creates an instance of the epoch Disk usage metric.
  109. """
  110. super(EpochDiskUsage, self).__init__(
  111. paths_to_monitor,
  112. reset_at='epoch', emit_at='epoch', mode='train')
  113. def __str__(self):
  114. return "DiskUsage_Epoch"
  115. class ExperienceDiskUsage(DiskPluginMetric):
  116. """
  117. The average experience Disk usage metric.
  118. This plugin metric works only at eval time.
  119. At the end of each experience, this metric logs the total
  120. size (in KB) of all the monitored paths.
  121. """
  122. def __init__(self, paths_to_monitor):
  123. """
  124. Creates an instance of the experience Disk usage metric.
  125. """
  126. super(ExperienceDiskUsage, self).__init__(
  127. paths_to_monitor,
  128. reset_at='experience', emit_at='experience', mode='eval')
  129. def __str__(self):
  130. return "DiskUsage_Exp"
  131. class StreamDiskUsage(DiskPluginMetric):
  132. """
  133. The average stream Disk usage metric.
  134. This plugin metric works only at eval time.
  135. At the end of the eval stream, this metric logs the total
  136. size (in KB) of all the monitored paths.
  137. """
  138. def __init__(self, paths_to_monitor):
  139. """
  140. Creates an instance of the stream Disk usage metric.
  141. """
  142. super(StreamDiskUsage, self).__init__(
  143. paths_to_monitor,
  144. reset_at='stream', emit_at='stream', mode='eval')
  145. def __str__(self):
  146. return "DiskUsage_Stream"
  147. def disk_usage_metrics(*, paths_to_monitor=None, minibatch=False, epoch=False,
  148. experience=False, stream=False) \
  149. -> List[PluginMetric]:
  150. """
  151. Helper method that can be used to obtain the desired set of
  152. standalone metrics.
  153. :param minibatch: If True, will return a metric able to log the minibatch
  154. Disk usage
  155. :param epoch: If True, will return a metric able to log the epoch
  156. Disk usage
  157. :param experience: If True, will return a metric able to log the experience
  158. Disk usage.
  159. :param stream: If True, will return a metric able to log the evaluation
  160. stream Disk usage.
  161. :return: A list of plugin metrics.
  162. """
  163. metrics = []
  164. if minibatch:
  165. metrics.append(MinibatchDiskUsage(paths_to_monitor=paths_to_monitor))
  166. if epoch:
  167. metrics.append(EpochDiskUsage(paths_to_monitor=paths_to_monitor))
  168. if experience:
  169. metrics.append(ExperienceDiskUsage(paths_to_monitor=paths_to_monitor))
  170. if stream:
  171. metrics.append(StreamDiskUsage(paths_to_monitor=paths_to_monitor))
  172. return metrics
  173. __all__ = [
  174. 'DiskUsage',
  175. 'MinibatchDiskUsage',
  176. 'EpochDiskUsage',
  177. 'ExperienceDiskUsage',
  178. 'StreamDiskUsage',
  179. 'disk_usage_metrics'
  180. ]