# -*- coding: utf-8 -*- from .__load__ import * class Comfyui(object): def info(self, file, check = False): output = self.act(file, False, False, check) info = {} if output: # 提取格式信息 format_info = re.search(r"Input #0, (.*?), from", output) if format_info: info['format'] = format_info.group(1) # 提取时长信息 duration_match = re.search(r'Duration: (\d+):(\d+):(\d+).(\d+)', output) if duration_match: hours = duration_match.group(1) minutes = duration_match.group(2) seconds = duration_match.group(3) milliseconds = duration_match.group(4) info['total_seconds'] = int(hours) * 3600 + int(minutes) * 60 + int(seconds) + int(milliseconds) / 100 info['duration'] = hours + ':' + minutes + ':' + seconds + '.' + milliseconds # 提取比特率信息 bitrate_info = re.search(r"bitrate: (\d+ kb/s)", output) if bitrate_info: info['bitrate'] = bitrate_info.group(1) # 提取视频流信息 video_stream_info = re.search(r"Stream #(\d+:\d+).*: Video: (.*?), (\d+x\d+)", output) if video_stream_info: info['video_stream'] = video_stream_info.group(1) info['video_codec'] = video_stream_info.group(2) resolution = video_stream_info.group(3) info['resolution'] = resolution width, height = resolution.split('x') info['width'] = int(width) info['height'] = int(height) # 提取音频流信息 audio_stream_info = re.search(r"Stream #(\d+:\d+).*: Audio: (.*?), (\d+ Hz)", output) if audio_stream_info: info['audio_stream'] = audio_stream_info.group(1) info['audio_codec'] = audio_stream_info.group(2) info['audio_sample_rate'] = audio_stream_info.group(3) match = re.search(r'(\d+(\.\d+)?) fps', output) if match: info['fps'] = float(match.group(1)) info['frame'] = info['fps'] * info['total_seconds'] return info def act(self, input, output, option = {}, check = False): self.cmd = [Demeter.ffmpeg] self.cmd.append(self.input(input)) if option: for k, v in option.items(): if not isinstance(v, list): v = [v] method = getattr(self, k) self.cmd.append(method(*v)) if output: self.cmd.append(self.output(output)) cmd = self.implode(' ', self.cmd) if check: print(cmd); return result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, encoding='latin-1') return result.stderr # 获取视频 def input(self, video): if isinstance(video, dict): return self.option(video['option'], ' -i ' + video['file']) return '-i "' + video + '"' # 输出视频 def output(self, video): #'-b:v 10000k' if isinstance(video, dict): return self.option(video['option'], ' ' + video['file']) return '-y ' + video # 按照时间截取 def time(self, max, start = 0): return '-ss ' + self.gmdate('%H:%M:%S', start) + ' -t ' + self.gmdate('%H:%M:%S', max) # video过滤器 def video(self, video): if video: cmd = [] for k, v in video.items(): cmd.append(self.filter('video', k, v)) return '-vf "'+self.implode(',', cmd)+'"' return '' # audio过滤器 def audio(self, audio): if video: cmd = [] for k, v in audio.items(): cmd.append(self.filter('audio', k, v)) return '-af "'+self.implode(',', cmd)+'"' return '' # 复杂过滤器 def filter_complex(self, param): cmd = [] for k, v in param.items(): cmd.append(v) return '-filter_complex "'+self.implode(';', cmd)+'"' # 过滤器 -简单过滤器 def filter(self, type, method, param): service = Demeter.service(type, 'filter') if hasattr(service, method): method = getattr(service, method) if not isinstance(param, list): param = [param] return method(*param) else: if isinstance(param, list): tmp = [] for k, v in param.items(): if v: v = '=' + v tmp.append(k + v) param = self.implode(':', tmp) if param: param = '=' + param return method + param # 获取选项 def option(self, option, suffix = ''): tmp = [] if option: for k, v in option.items(): if v: v = ' ' + v if isinstance(k, str) and k: tmp.append('-' + self.alias(k) + v) else: tmp.append(v) return self.implode(' ', tmp) + suffix # 设置别名 def alias(self, k): if k == 'audio': return 'b:a' if k == 'video': return 'b:v' return k def implode(self, stn, lst): return stn.join(map(str, lst)) def gmdate(self, format, start): # 将给定的时间戳转换为 datetime 对象 dt = datetime.fromtimestamp(start, tz=timezone.utc) # 使用 strftime 方法格式化 datetime 对象 return dt.strftime(format)