comfyui.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. # -*- coding: utf-8 -*-
  2. from .__load__ import *
  3. class Comfyui(object):
  4. def info(self, file, check = False):
  5. output = self.act(file, False, False, check)
  6. info = {}
  7. if output:
  8. # 提取格式信息
  9. format_info = re.search(r"Input #0, (.*?), from", output)
  10. if format_info:
  11. info['format'] = format_info.group(1)
  12. # 提取时长信息
  13. duration_match = re.search(r'Duration: (\d+):(\d+):(\d+).(\d+)', output)
  14. if duration_match:
  15. hours = duration_match.group(1)
  16. minutes = duration_match.group(2)
  17. seconds = duration_match.group(3)
  18. milliseconds = duration_match.group(4)
  19. info['total_seconds'] = int(hours) * 3600 + int(minutes) * 60 + int(seconds) + int(milliseconds) / 100
  20. info['duration'] = hours + ':' + minutes + ':' + seconds + '.' + milliseconds
  21. # 提取比特率信息
  22. bitrate_info = re.search(r"bitrate: (\d+ kb/s)", output)
  23. if bitrate_info:
  24. info['bitrate'] = bitrate_info.group(1)
  25. # 提取视频流信息
  26. video_stream_info = re.search(r"Stream #(\d+:\d+).*: Video: (.*?), (\d+x\d+)", output)
  27. if video_stream_info:
  28. info['video_stream'] = video_stream_info.group(1)
  29. info['video_codec'] = video_stream_info.group(2)
  30. resolution = video_stream_info.group(3)
  31. info['resolution'] = resolution
  32. width, height = resolution.split('x')
  33. info['width'] = int(width)
  34. info['height'] = int(height)
  35. # 提取音频流信息
  36. audio_stream_info = re.search(r"Stream #(\d+:\d+).*: Audio: (.*?), (\d+ Hz)", output)
  37. if audio_stream_info:
  38. info['audio_stream'] = audio_stream_info.group(1)
  39. info['audio_codec'] = audio_stream_info.group(2)
  40. info['audio_sample_rate'] = audio_stream_info.group(3)
  41. match = re.search(r'(\d+(\.\d+)?) fps', output)
  42. if match:
  43. info['fps'] = float(match.group(1))
  44. info['frame'] = info['fps'] * info['total_seconds']
  45. return info
  46. def act(self, input, output, option = {}, check = False):
  47. self.cmd = [Demeter.ffmpeg]
  48. self.cmd.append(self.input(input))
  49. if option:
  50. for k, v in option.items():
  51. if not isinstance(v, list):
  52. v = [v]
  53. method = getattr(self, k)
  54. self.cmd.append(method(*v))
  55. if output:
  56. self.cmd.append(self.output(output))
  57. cmd = self.implode(' ', self.cmd)
  58. if check:
  59. print(cmd);
  60. return
  61. result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, encoding='latin-1')
  62. return result.stderr
  63. # 获取视频
  64. def input(self, video):
  65. if isinstance(video, dict):
  66. return self.option(video['option'], ' -i ' + video['file'])
  67. return '-i "' + video + '"'
  68. # 输出视频
  69. def output(self, video):
  70. #'-b:v 10000k'
  71. if isinstance(video, dict):
  72. return self.option(video['option'], ' ' + video['file'])
  73. return '-y ' + video
  74. # 按照时间截取
  75. def time(self, max, start = 0):
  76. return '-ss ' + self.gmdate('%H:%M:%S', start) + ' -t ' + self.gmdate('%H:%M:%S', max)
  77. # video过滤器
  78. def video(self, video):
  79. if video:
  80. cmd = []
  81. for k, v in video.items():
  82. cmd.append(self.filter('video', k, v))
  83. return '-vf "'+self.implode(',', cmd)+'"'
  84. return ''
  85. # audio过滤器
  86. def audio(self, audio):
  87. if video:
  88. cmd = []
  89. for k, v in audio.items():
  90. cmd.append(self.filter('audio', k, v))
  91. return '-af "'+self.implode(',', cmd)+'"'
  92. return ''
  93. # 复杂过滤器
  94. def filter_complex(self, param):
  95. cmd = []
  96. for k, v in param.items():
  97. cmd.append(v)
  98. return '-filter_complex "'+self.implode(';', cmd)+'"'
  99. # 过滤器 -简单过滤器
  100. def filter(self, type, method, param):
  101. service = Demeter.service(type, 'filter')
  102. if hasattr(service, method):
  103. method = getattr(service, method)
  104. if not isinstance(param, list):
  105. param = [param]
  106. return method(*param)
  107. else:
  108. if isinstance(param, list):
  109. tmp = []
  110. for k, v in param.items():
  111. if v:
  112. v = '=' + v
  113. tmp.append(k + v)
  114. param = self.implode(':', tmp)
  115. if param:
  116. param = '=' + param
  117. return method + param
  118. # 获取选项
  119. def option(self, option, suffix = ''):
  120. tmp = []
  121. if option:
  122. for k, v in option.items():
  123. if v:
  124. v = ' ' + v
  125. if isinstance(k, str) and k:
  126. tmp.append('-' + self.alias(k) + v)
  127. else:
  128. tmp.append(v)
  129. return self.implode(' ', tmp) + suffix
  130. # 设置别名
  131. def alias(self, k):
  132. if k == 'audio':
  133. return 'b:a'
  134. if k == 'video':
  135. return 'b:v'
  136. return k
  137. def implode(self, stn, lst):
  138. return stn.join(map(str, lst))
  139. def gmdate(self, format, start):
  140. # 将给定的时间戳转换为 datetime 对象
  141. dt = datetime.fromtimestamp(start, tz=timezone.utc)
  142. # 使用 strftime 方法格式化 datetime 对象
  143. return dt.strftime(format)