123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- # -*- coding: utf-8 -*-
- from .__load__ import *
- import fitz
- import re
- class Pdf(Base):
- def json(self):
- if not self.file:
- return False
- self.audio = ''
- if 'audio' in self.param and self.param['audio']:
- # 有音频文件
- self.audio = self.param['audio']
- self.audio = Demeter.service('loader', 'extract').get(self.audio).json()
- self.getPath()
- doc = fitz.open(self.file)
- page_count = doc.page_count
- result = {'total': page_count, 'pages': [], 'text': []}
- scale = 2.0
- for page_num in range(doc.page_count):
- page = page_num + 1
- page_obj = doc.load_page(page_num)
- page_width, page_height = page_obj.rect.width, page_obj.rect.height
- # 提取文字
- # 获取每页的 words
- # 每个 word 是 [x0, y0, x1, y1, "word_text", block_no, line_no, word_no]
- words = page_obj.get_text("words")
- for w in words:
- x0, y0, x1, y1, text, *_ = w
- # 转百分比,方便前端高亮
- rel_bbox = [
- (x0 / (page_width)) * 100,
- (y0 / (page_height)) * 100,
- (x1 / (page_width)) * 100,
- (y1 / (page_height)) * 100,
- ]
- result['text'].append({
- "page": page,
- "text": text,
- "bbox": rel_bbox
- })
- # 提取封面图
- mat = fitz.Matrix(scale, scale)
- pix = page_obj.get_pixmap(matrix=mat, alpha=False)
- cover_file = f"{self.param['path']}cover_page_{page_num+1}.png"
- pix.save(cover_file)
- if 'host' in self.param and self.param['host']:
- cover_file = cover_file.replace(Demeter.path + 'runtime/', self.param['host'])
- result['pages'].append({
- "page": page,
- "img": cover_file,
- "width": page_width * scale,
- "height": page_height * scale,
- })
- if self.audio:
- text = []
- for item in self.audio:
- state = self.find(result['text'], item);
- if state:
- text.append(state)
- result['text'] = text
- return result
- def find(self, text, audio):
- for idx, words in enumerate(text):
- if words['page'] <= 1:
- continue
- if audio['text'].lower() in words['text'].lower():
- text_item = {
- "page": words['page'],
- "text": words['text'],
- "bbox": words['bbox'],
- "start": audio['start'],
- "end": audio['end'],
- }
- text.pop(idx) # 根据下标删除,避免重复 dict 的问题
- return text_item
- return False
- # 提取为langchain的Document格式
- def doc(self):
- if not self.file:
- return False
- #loader = PyPDFLoader(self.file, extract_images=False)
- #return loader.load()
- doc = fitz.open(self.file)
- result = {'page': page, 'content': []}
- for page_num in range(len(doc)):
- page = doc.load_page(page_num)
-
- # 提取文本
- text = page.get_text()
- # 提取图片中的文字
- image_texts = []
- for img in page.get_images(full=True):
- xref = img[0]
- base_image = doc.extract_image(xref)
- image_bytes = base_image["image"]
- image = Image.open(io.BytesIO(image_bytes))
- #result = Demeter.service('loader', 'extract').get(image)
- ocr_result = ocr.ocr(image)
- for line in ocr_result[0]:
- image_texts.append(line[1])
- '''
- # OCR 识别
- ocr_result = ocr_reader.readtext(image)
- image_texts = " ".join([line[1] for line in ocr_result]).strip()
- '''
- # 合并文字 + 图片文字
- full_text = text.strip() + "\n" + "\n".join(image_texts)
- document = langchain.schema.Document(page_content=full_text)
- result['content'].append(document)
- return result
- def clean_text(self, s):
- return re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', s)
|