123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- # -*- coding: utf-8 -*-
- from .__load__ import *
- class Comfyui(object):
- def info(self, file, check = False):
- output = self.act(file, False, False, check)
- info = {}
- if output:
- # 提取格式信息
- format_info = re.search(r"Input #0, (.*?), from", output)
- if format_info:
- info['format'] = format_info.group(1)
- # 提取时长信息
- duration_match = re.search(r'Duration: (\d+):(\d+):(\d+).(\d+)', output)
- if duration_match:
- hours = duration_match.group(1)
- minutes = duration_match.group(2)
- seconds = duration_match.group(3)
- milliseconds = duration_match.group(4)
- info['total_seconds'] = int(hours) * 3600 + int(minutes) * 60 + int(seconds) + int(milliseconds) / 100
- info['duration'] = hours + ':' + minutes + ':' + seconds + '.' + milliseconds
- # 提取比特率信息
- bitrate_info = re.search(r"bitrate: (\d+ kb/s)", output)
- if bitrate_info:
- info['bitrate'] = bitrate_info.group(1)
- # 提取视频流信息
- video_stream_info = re.search(r"Stream #(\d+:\d+).*: Video: (.*?), (\d+x\d+)", output)
- if video_stream_info:
- info['video_stream'] = video_stream_info.group(1)
- info['video_codec'] = video_stream_info.group(2)
- resolution = video_stream_info.group(3)
- info['resolution'] = resolution
- width, height = resolution.split('x')
- info['width'] = int(width)
- info['height'] = int(height)
- # 提取音频流信息
- audio_stream_info = re.search(r"Stream #(\d+:\d+).*: Audio: (.*?), (\d+ Hz)", output)
- if audio_stream_info:
- info['audio_stream'] = audio_stream_info.group(1)
- info['audio_codec'] = audio_stream_info.group(2)
- info['audio_sample_rate'] = audio_stream_info.group(3)
- match = re.search(r'(\d+(\.\d+)?) fps', output)
- if match:
- info['fps'] = float(match.group(1))
- info['frame'] = info['fps'] * info['total_seconds']
- return info
- def act(self, input, output, option = {}, check = False):
- self.cmd = [Demeter.ffmpeg]
- self.cmd.append(self.input(input))
- if option:
- for k, v in option.items():
- if not isinstance(v, list):
- v = [v]
- method = getattr(self, k)
- self.cmd.append(method(*v))
- if output:
- self.cmd.append(self.output(output))
- cmd = self.implode(' ', self.cmd)
- if check:
- print(cmd);
- return
- result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, encoding='latin-1')
- return result.stderr
- # 获取视频
- def input(self, video):
- if isinstance(video, dict):
- return self.option(video['option'], ' -i ' + video['file'])
- return '-i "' + video + '"'
- # 输出视频
- def output(self, video):
- #'-b:v 10000k'
- if isinstance(video, dict):
- return self.option(video['option'], ' ' + video['file'])
- return '-y ' + video
- # 按照时间截取
- def time(self, max, start = 0):
- return '-ss ' + self.gmdate('%H:%M:%S', start) + ' -t ' + self.gmdate('%H:%M:%S', max)
- # video过滤器
- def video(self, video):
- if video:
- cmd = []
- for k, v in video.items():
- cmd.append(self.filter('video', k, v))
- return '-vf "'+self.implode(',', cmd)+'"'
- return ''
- # audio过滤器
- def audio(self, audio):
- if video:
- cmd = []
- for k, v in audio.items():
- cmd.append(self.filter('audio', k, v))
- return '-af "'+self.implode(',', cmd)+'"'
- return ''
- # 复杂过滤器
- def filter_complex(self, param):
- cmd = []
- for k, v in param.items():
- cmd.append(v)
- return '-filter_complex "'+self.implode(';', cmd)+'"'
- # 过滤器 -简单过滤器
- def filter(self, type, method, param):
- service = Demeter.service(type, 'filter')
- if hasattr(service, method):
- method = getattr(service, method)
- if not isinstance(param, list):
- param = [param]
- return method(*param)
- else:
- if isinstance(param, list):
- tmp = []
- for k, v in param.items():
- if v:
- v = '=' + v
- tmp.append(k + v)
- param = self.implode(':', tmp)
- if param:
- param = '=' + param
- return method + param
- # 获取选项
- def option(self, option, suffix = ''):
- tmp = []
- if option:
- for k, v in option.items():
- if v:
- v = ' ' + v
- if isinstance(k, str) and k:
- tmp.append('-' + self.alias(k) + v)
- else:
- tmp.append(v)
- return self.implode(' ', tmp) + suffix
- # 设置别名
- def alias(self, k):
- if k == 'audio':
- return 'b:a'
- if k == 'video':
- return 'b:v'
- return k
- def implode(self, stn, lst):
- return stn.join(map(str, lst))
- def gmdate(self, format, start):
- # 将给定的时间戳转换为 datetime 对象
- dt = datetime.fromtimestamp(start, tz=timezone.utc)
- # 使用 strftime 方法格式化 datetime 对象
- return dt.strftime(format)
|