edge.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. # -*- coding: utf-8 -*-
  2. from .__load__ import *
  3. import sys
  4. import asyncio
  5. import textwrap
  6. import subprocess
  7. import pysrt
  8. import json
  9. from edge_tts import Communicate
  10. MAX_CHARS = 300
  11. def output_json(success, message, mp3_path=None, srt_path=None):
  12. print(json.dumps({
  13. "success": success,
  14. "message": message,
  15. "mp3": mp3_path,
  16. "srt": srt_path
  17. }))
  18. sys.exit(0 if success else 1)
  19. # 参数校验
  20. if len(sys.argv) < 4:
  21. output_json(False, "Usage: python tts_split_and_merge.py <text_file> <voice> <output_dir>")
  22. TEXT_FILE = sys.argv[1]
  23. VOICE = sys.argv[2]
  24. OUTPUT_DIR = sys.argv[3]
  25. FINAL_MP3 = os.path.join(OUTPUT_DIR, "1.mp3")
  26. FINAL_SRT = os.path.join(OUTPUT_DIR, "1.srt")
  27. os.makedirs(OUTPUT_DIR, exist_ok=True)
  28. def split_text(text, max_len=MAX_CHARS):
  29. return textwrap.wrap(text, max_len, break_long_words=False, break_on_hyphens=False)
  30. async def synthesize_segments(segments):
  31. for i, segment in enumerate(segments):
  32. mp3_path = os.path.join(OUTPUT_DIR, f"part_{i}.mp3")
  33. srt_path = os.path.join(OUTPUT_DIR, f"part_{i}.srt")
  34. communicate = Communicate(text=segment, voice=VOICE)
  35. await communicate.save(mp3_path, srt_path)
  36. def merge_mp3_files():
  37. concat_list_path = os.path.join(OUTPUT_DIR, "concat_list.txt")
  38. with open(concat_list_path, "w", encoding="utf-8") as f:
  39. i = 0
  40. while True:
  41. mp3_file = os.path.join(OUTPUT_DIR, f"part_{i}.mp3")
  42. if not os.path.exists(mp3_file): break
  43. f.write(f"file '{os.path.abspath(mp3_file)}'\n")
  44. i += 1
  45. subprocess.run([
  46. "ffmpeg", "-f", "concat", "-safe", "0",
  47. "-i", concat_list_path, "-c", "copy", FINAL_MP3
  48. ], check=True)
  49. def merge_srt_files():
  50. subtitles = pysrt.SubRipFile()
  51. current_offset = 0
  52. i = 0
  53. while True:
  54. srt_path = os.path.join(OUTPUT_DIR, f"part_{i}.srt")
  55. if not os.path.exists(srt_path): break
  56. part_subs = pysrt.open(srt_path)
  57. for sub in part_subs:
  58. sub.shift(seconds=current_offset)
  59. if part_subs:
  60. last_end = part_subs[-1].end.ordinal / 1000.0
  61. current_offset += last_end
  62. subtitles.extend(part_subs)
  63. i += 1
  64. subtitles.save(FINAL_SRT, encoding='utf-8')
  65. async def main():
  66. try:
  67. with open(TEXT_FILE, "r", encoding="utf-8") as f:
  68. text = f.read()
  69. segments = split_text(text)
  70. await synthesize_segments(segments)
  71. merge_mp3_files()
  72. merge_srt_files()
  73. output_json(True, "Synthesis completed successfully", FINAL_MP3, FINAL_SRT)
  74. except Exception as e:
  75. output_json(False, f"Error: {str(e)}")
  76. if __name__ == "__main__":
  77. asyncio.run(main())