12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- # -*- coding: utf-8 -*-
- from .__load__ import *
- import sys
- import asyncio
- import textwrap
- import subprocess
- import pysrt
- import json
- from edge_tts import Communicate
- MAX_CHARS = 300
- def output_json(success, message, mp3_path=None, srt_path=None):
- print(json.dumps({
- "success": success,
- "message": message,
- "mp3": mp3_path,
- "srt": srt_path
- }))
- sys.exit(0 if success else 1)
- # 参数校验
- if len(sys.argv) < 4:
- output_json(False, "Usage: python tts_split_and_merge.py <text_file> <voice> <output_dir>")
- TEXT_FILE = sys.argv[1]
- VOICE = sys.argv[2]
- OUTPUT_DIR = sys.argv[3]
- FINAL_MP3 = os.path.join(OUTPUT_DIR, "1.mp3")
- FINAL_SRT = os.path.join(OUTPUT_DIR, "1.srt")
- os.makedirs(OUTPUT_DIR, exist_ok=True)
- def split_text(text, max_len=MAX_CHARS):
- return textwrap.wrap(text, max_len, break_long_words=False, break_on_hyphens=False)
- async def synthesize_segments(segments):
- for i, segment in enumerate(segments):
- mp3_path = os.path.join(OUTPUT_DIR, f"part_{i}.mp3")
- srt_path = os.path.join(OUTPUT_DIR, f"part_{i}.srt")
- communicate = Communicate(text=segment, voice=VOICE)
- await communicate.save(mp3_path, srt_path)
- def merge_mp3_files():
- concat_list_path = os.path.join(OUTPUT_DIR, "concat_list.txt")
- with open(concat_list_path, "w", encoding="utf-8") as f:
- i = 0
- while True:
- mp3_file = os.path.join(OUTPUT_DIR, f"part_{i}.mp3")
- if not os.path.exists(mp3_file): break
- f.write(f"file '{os.path.abspath(mp3_file)}'\n")
- i += 1
- subprocess.run([
- "ffmpeg", "-f", "concat", "-safe", "0",
- "-i", concat_list_path, "-c", "copy", FINAL_MP3
- ], check=True)
- def merge_srt_files():
- subtitles = pysrt.SubRipFile()
- current_offset = 0
- i = 0
- while True:
- srt_path = os.path.join(OUTPUT_DIR, f"part_{i}.srt")
- if not os.path.exists(srt_path): break
- part_subs = pysrt.open(srt_path)
- for sub in part_subs:
- sub.shift(seconds=current_offset)
- if part_subs:
- last_end = part_subs[-1].end.ordinal / 1000.0
- current_offset += last_end
- subtitles.extend(part_subs)
- i += 1
- subtitles.save(FINAL_SRT, encoding='utf-8')
- async def main():
- try:
- with open(TEXT_FILE, "r", encoding="utf-8") as f:
- text = f.read()
- segments = split_text(text)
- await synthesize_segments(segments)
- merge_mp3_files()
- merge_srt_files()
- output_json(True, "Synthesis completed successfully", FINAL_MP3, FINAL_SRT)
- except Exception as e:
- output_json(False, f"Error: {str(e)}")
- if __name__ == "__main__":
- asyncio.run(main())
|