platform.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. # -*- coding: utf-8 -*-
  2. #
  3. # PySceneDetect: Python-Based Video Scene Detector
  4. # -------------------------------------------------------------------
  5. # [ Site: https://scenedetect.com ]
  6. # [ Docs: https://scenedetect.com/docs/ ]
  7. # [ Github: https://github.com/Breakthrough/PySceneDetect/ ]
  8. #
  9. # Copyright (C) 2014-2024 Brandon Castellano <http://www.bcastell.com>.
  10. # PySceneDetect is licensed under the BSD 3-Clause License; see the
  11. # included LICENSE file, or visit one of the above pages for details.
  12. #
  13. """``scenedetect.platform`` Module
  14. This moduke contains all platform/library specific compatibility fixes, as well as some utility
  15. functions to handle logging and invoking external commands.
  16. """
  17. import importlib
  18. import logging
  19. import os
  20. import os.path
  21. import platform
  22. import re
  23. import string
  24. import subprocess
  25. import sys
  26. from typing import AnyStr, Dict, List, Optional, Union
  27. import cv2
  28. from demeter.core import *
  29. ##
  30. ## tqdm Library
  31. ##
  32. class FakeTqdmObject:
  33. """Provides a no-op tqdm-like object."""
  34. # pylint: disable=unused-argument
  35. def __init__(self, **kawrgs):
  36. """No-op."""
  37. def update(self, n=1):
  38. """No-op."""
  39. def close(self):
  40. """No-op."""
  41. def set_description(self, desc=None, refresh=True):
  42. """No-op."""
  43. # pylint: enable=unused-argument
  44. class FakeTqdmLoggingRedirect:
  45. """Provides a no-op tqdm context manager for redirecting log messages."""
  46. # pylint: disable=redefined-builtin,unused-argument
  47. def __init__(self, **kawrgs):
  48. """No-op."""
  49. def __enter__(self):
  50. """No-op."""
  51. def __exit__(self, type, value, traceback):
  52. """No-op."""
  53. # pylint: enable=redefined-builtin,unused-argument
  54. # Try to import tqdm and the logging redirect, otherwise provide fake implementations..
  55. try:
  56. # pylint: disable=unused-import
  57. from tqdm import tqdm
  58. from tqdm.contrib.logging import logging_redirect_tqdm
  59. # pylint: enable=unused-import
  60. except ModuleNotFoundError:
  61. # pylint: disable=invalid-name
  62. tqdm = FakeTqdmObject
  63. logging_redirect_tqdm = FakeTqdmLoggingRedirect
  64. # pylint: enable=invalid-name
  65. ##
  66. ## OpenCV imwrite Supported Image Types & Quality/Compression Parameters
  67. ##
  68. # TODO: Move this into scene_manager.
  69. def get_cv2_imwrite_params() -> Dict[str, Union[int, None]]:
  70. """ Get OpenCV imwrite Params: Returns a dict of supported image formats and
  71. their associated quality/compression parameter index, or None if that format
  72. is not supported.
  73. Returns:
  74. Dictionary of supported image formats/extensions ('jpg', 'png', etc...) mapped to the
  75. respective OpenCV quality or compression parameter as {'jpg': cv2.IMWRITE_JPEG_QUALITY,
  76. 'png': cv2.IMWRITE_PNG_COMPRESSION, ...}. Parameter will be None if not found on the
  77. current system library (e.g. {'jpg': None}).
  78. """
  79. def _get_cv2_param(param_name: str) -> Union[int, None]:
  80. if param_name.startswith('CV_'):
  81. param_name = param_name[3:]
  82. try:
  83. return getattr(cv2, param_name)
  84. except AttributeError:
  85. return None
  86. return {
  87. 'jpg': _get_cv2_param('IMWRITE_JPEG_QUALITY'),
  88. 'png': _get_cv2_param('IMWRITE_PNG_COMPRESSION'),
  89. 'webp': _get_cv2_param('IMWRITE_WEBP_QUALITY')
  90. }
  91. ##
  92. ## File I/O
  93. ##
  94. def get_file_name(file_path: AnyStr, include_extension=True) -> AnyStr:
  95. """Return the file name that `file_path` refers to, optionally removing the extension.
  96. If `include_extension` is False, the result will always be a str.
  97. E.g. /tmp/foo.bar -> foo"""
  98. file_name = os.path.basename(file_path)
  99. if not include_extension:
  100. file_name = str(file_name)
  101. last_dot_pos = file_name.rfind('.')
  102. if last_dot_pos >= 0:
  103. file_name = file_name[:last_dot_pos]
  104. return file_name
  105. def get_and_create_path(file_path: AnyStr, output_directory: Optional[AnyStr] = None) -> AnyStr:
  106. """ Get & Create Path: Gets and returns the full/absolute path to file_path
  107. in the specified output_directory if set, creating any required directories
  108. along the way.
  109. If file_path is already an absolute path, then output_directory is ignored.
  110. Arguments:
  111. file_path: File name to get path for. If file_path is an absolute
  112. path (e.g. starts at a drive/root), no modification of the path
  113. is performed, only ensuring that all output directories are created.
  114. output_dir: An optional output directory to override the
  115. directory of file_path if it is relative to the working directory.
  116. Returns:
  117. Full path to output file suitable for writing.
  118. """
  119. # If an output directory is defined and the file path is a relative path, open
  120. # the file handle in the output directory instead of the working directory.
  121. if output_directory is not None and not os.path.isabs(file_path):
  122. file_path = os.path.join(output_directory, file_path)
  123. # Now that file_path is an absolute path, let's make sure all the directories
  124. # exist for us to start writing files there.
  125. os.makedirs(os.path.split(os.path.abspath(file_path))[0], exist_ok=True)
  126. return file_path
  127. ##
  128. ## Logging
  129. ##
  130. def init_logger(log_level: int = logging.INFO,
  131. show_stdout: bool = False,
  132. log_file: Optional[str] = None):
  133. """Initializes logging for PySceneDetect. The logger instance used is named 'pyscenedetect'.
  134. By default the logger has no handlers to suppress output. All existing log handlers are replaced
  135. every time this function is invoked.
  136. Arguments:
  137. log_level: Verbosity of log messages. Should be one of [logging.INFO, logging.DEBUG,
  138. logging.WARNING, logging.ERROR, logging.CRITICAL].
  139. show_stdout: If True, add handler to show log messages on stdout (default: False).
  140. log_file: If set, add handler to dump debug log messages to given file path.
  141. """
  142. # Format of log messages depends on verbosity.
  143. INFO_TEMPLATE = '[PySceneDetect] %(message)s'
  144. DEBUG_TEMPLATE = '%(levelname)s: %(module)s.%(funcName)s(): %(message)s'
  145. # Get the named logger and remove any existing handlers.
  146. logger_instance = logging.getLogger('pyscenedetect')
  147. logger_instance.handlers = []
  148. logger_instance.setLevel(log_level)
  149. # Add stdout handler if required.
  150. if show_stdout:
  151. handler = logging.StreamHandler(stream=sys.stdout)
  152. handler.setLevel(log_level)
  153. handler.setFormatter(
  154. logging.Formatter(fmt=DEBUG_TEMPLATE if log_level == logging.DEBUG else INFO_TEMPLATE))
  155. logger_instance.addHandler(handler)
  156. # Add debug log handler if required.
  157. if log_file:
  158. log_file = get_and_create_path(log_file)
  159. handler = logging.FileHandler(log_file)
  160. handler.setLevel(logging.DEBUG)
  161. handler.setFormatter(logging.Formatter(fmt=DEBUG_TEMPLATE))
  162. logger_instance.addHandler(handler)
  163. ##
  164. ## Running External Commands
  165. ##
  166. class CommandTooLong(Exception):
  167. """Raised if the length of a command line argument exceeds the limit allowed on Windows."""
  168. def invoke_command(args: List[str]) -> int:
  169. """Same as calling Python's subprocess.call() method, but explicitly
  170. raises a different exception when the command length is too long.
  171. See https://github.com/Breakthrough/PySceneDetect/issues/164 for details.
  172. Arguments:
  173. args: List of strings to pass to subprocess.call().
  174. Returns:
  175. Return code of command.
  176. Raises:
  177. CommandTooLong: `args` exceeds built in command line length limit on Windows.
  178. """
  179. try:
  180. return subprocess.call(args)
  181. except OSError as err:
  182. if os.name != 'nt':
  183. raise
  184. exception_string = str(err)
  185. # Error 206: The filename or extension is too long
  186. # Error 87: The parameter is incorrect
  187. to_match = ('206', '87')
  188. if any([x in exception_string for x in to_match]):
  189. raise CommandTooLong() from err
  190. raise
  191. def get_ffmpeg_path() -> Optional[str]:
  192. """Get path to ffmpeg if available on the current system. First looks at PATH, then checks if
  193. one is available from the `imageio_ffmpeg` package. Returns None if ffmpeg couldn't be found.
  194. """
  195. # Try invoking ffmpeg with the current environment.
  196. try:
  197. '''
  198. subprocess.call(['ffmpeg', '-v', 'quiet'])
  199. return 'ffmpeg'
  200. '''
  201. subprocess.call([Demeter.ffmpeg, '-v', 'quiet'])
  202. return Demeter.ffmpeg
  203. except OSError:
  204. pass # Failed to invoke ffmpeg with current environment, try another possibility.
  205. # Try invoking ffmpeg using the one from `imageio_ffmpeg` if available.
  206. try:
  207. # pylint: disable=import-outside-toplevel
  208. from imageio_ffmpeg import get_ffmpeg_exe
  209. # pylint: enable=import-outside-toplevel
  210. subprocess.call([get_ffmpeg_exe(), '-v', 'quiet'])
  211. return get_ffmpeg_exe()
  212. # Gracefully handle case where imageio_ffmpeg is not available.
  213. except ModuleNotFoundError:
  214. pass
  215. # Handle case where path might be wrong/non-existent.
  216. except OSError:
  217. pass
  218. # get_ffmpeg_exe may throw a RuntimeError if the executable is not available.
  219. except RuntimeError:
  220. pass
  221. return None
  222. def get_ffmpeg_version() -> Optional[str]:
  223. """Get ffmpeg version identifier, or None if ffmpeg is not found. Uses `get_ffmpeg_path()`."""
  224. ffmpeg_path = get_ffmpeg_path()
  225. if ffmpeg_path is None:
  226. return None
  227. # If get_ffmpeg_path() returns a value, the path it returns should be invocable.
  228. output = subprocess.check_output(args=[ffmpeg_path, '-version'], text=True)
  229. output_split = output.split()
  230. if len(output_split) >= 3 and output_split[1] == 'version':
  231. return output_split[2]
  232. # If parsing the version fails, return the entire first line of output.
  233. return output.splitlines()[0]
  234. def get_mkvmerge_version() -> Optional[str]:
  235. """Get mkvmerge version identifier, or None if mkvmerge is not found in PATH."""
  236. tool_name = 'mkvmerge'
  237. try:
  238. output = subprocess.check_output(args=[tool_name, '--version'], text=True)
  239. except FileNotFoundError:
  240. # mkvmerge doesn't exist on the system
  241. return None
  242. output_split = output.split()
  243. if len(output_split) >= 1 and output_split[0] == tool_name:
  244. return ' '.join(output_split[1:])
  245. # If parsing the version fails, return the entire first line of output.
  246. return output.splitlines()[0]
  247. def get_system_version_info() -> str:
  248. """Get the system's operating system, Python, packages, and external tool versions.
  249. Useful for debugging or filing bug reports.
  250. Used for the `scenedetect version -a` command.
  251. """
  252. output_template = '{:<12} {}'
  253. line_separator = '-' * 60
  254. not_found_str = 'Not Installed'
  255. out_lines = []
  256. # System (Python, OS)
  257. out_lines += ['System Info', line_separator]
  258. out_lines += [
  259. output_template.format(name, version) for name, version in (
  260. ('OS', '%s' % platform.platform()),
  261. ('Python', '%d.%d.%d' % sys.version_info[0:3]),
  262. )
  263. ]
  264. # Third-Party Packages
  265. out_lines += ['', 'Packages', line_separator]
  266. third_party_packages = (
  267. 'av',
  268. 'click',
  269. 'cv2',
  270. 'moviepy',
  271. 'numpy',
  272. 'platformdirs',
  273. 'scenedetect',
  274. 'tqdm',
  275. )
  276. for module_name in third_party_packages:
  277. try:
  278. module = importlib.import_module(module_name)
  279. out_lines.append(output_template.format(module_name, module.__version__))
  280. except ModuleNotFoundError:
  281. out_lines.append(output_template.format(module_name, not_found_str))
  282. # External Tools
  283. out_lines += ['', 'Tools', line_separator]
  284. tool_version_info = (
  285. ('ffmpeg', get_ffmpeg_version()),
  286. ('mkvmerge', get_mkvmerge_version()),
  287. )
  288. for (tool_name, tool_version) in tool_version_info:
  289. out_lines.append(
  290. output_template.format(tool_name, tool_version if tool_version else not_found_str))
  291. return '\n'.join(out_lines)
  292. class Template(string.Template):
  293. """Template matcher used to replace instances of $TEMPLATES in filenames."""
  294. idpattern = '[A-Z0-9_]+'
  295. flags = re.ASCII