123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- # -*- coding: utf-8 -*-
- from .__load__ import *
- #from langchain_community.document_loaders import PyPDFLoader
- import fitz
- #from PIL import Image
- class Pdf(Base):
- def json(self):
- if not self.file:
- return False
- self.getPath()
- doc = fitz.open(self.file)
- page_count = doc.page_count
- result = {'total': page_count, 'pages': []}
- for page_num in range(len(doc)):
- page_obj = doc.load_page(page_num)
- page_height = page_obj.rect.height
- # 空白页跳过
- text = page_obj.get_text().strip()
- blocks = page_obj.get_text("dict", sort=True)["blocks"]
- has_visible_content = any(
- b for b in blocks if b['type'] in (0, 1)
- )
- if not text and not has_visible_content:
- continue
- # 增加封面图
- try:
- pix = page_obj.get_pixmap(matrix=fitz.Matrix(0.3, 0.3))
- cover_file = f"{self.param['path']}cover_page_{page_num+1}.png"
- pix.save(cover_file)
- if 'host' in self.param and self.param['host']:
- cover_file = cover_file.replace(Demeter.path + 'runtime/', self.param['host'])
- except Exception as e:
- print(f"封面图失败: {e}")
- cover_file = ""
- page_items = []
- blocks = page_obj.get_text("dict", sort=True)["blocks"]
- for i, b in enumerate(blocks):
- y_top = b["bbox"][1]
- y_bottom = b["bbox"][3]
- if y_top < page_height * 0.02 or y_bottom > page_height * 0.98:
- continue
- if b['type'] == 0:
- text_content = ""
- for line in b["lines"]:
- for span in line["spans"]:
- text_content += span["text"]
- text_content += "\n"
- text_content = text_content.strip()
- text_content = self.clean_text(self.removeDomains(text_content))
- if text_content:
- page_items.append({
- "type": "text",
- "content": text_content,
- "pos": b["bbox"]
- })
- elif b['type'] == 1:
- image_bytes = b.get("image", b"")
- if not image_bytes or len(image_bytes) < 100:
- continue
- image_ext = "png"
- image_file = f"{self.param['path']}page{page_num+1}_img_{i}.{image_ext}"
- with open(image_file, "wb") as f:
- f.write(image_bytes)
- if 'host' in self.param and self.param['host']:
- image_file = image_file.replace(Demeter.path + 'runtime/', self.param['host'])
- page_items.append({
- "type": "image",
- "ext": image_ext,
- "content": image_file,
- "pos": b["bbox"]
- })
- result['pages'].append({
- "cover": cover_file,
- "content": page_items
- })
- return result
- # 提取为langchain的Document格式
- def doc(self):
- if not self.file:
- return False
- #loader = PyPDFLoader(self.file, extract_images=False)
- #return loader.load()
- doc = fitz.open(self.file)
- result = {'page': page, 'content': []}
- for page_num in range(len(doc)):
- page = doc.load_page(page_num)
-
- # 提取文本
- text = page.get_text()
- # 提取图片中的文字
- image_texts = []
- for img in page.get_images(full=True):
- xref = img[0]
- base_image = doc.extract_image(xref)
- image_bytes = base_image["image"]
- image = Image.open(io.BytesIO(image_bytes))
- #result = Demeter.service('loader', 'extract').get(image)
- ocr_result = ocr.ocr(image)
- for line in ocr_result[0]:
- image_texts.append(line[1])
- '''
- # OCR 识别
- ocr_result = ocr_reader.readtext(image)
- image_texts = " ".join([line[1] for line in ocr_result]).strip()
- '''
- # 合并文字 + 图片文字
- full_text = text.strip() + "\n" + "\n".join(image_texts)
- document = langchain.schema.Document(page_content=full_text)
- result['content'].append(document)
- return result
- def clean_text(self, s):
- return re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', s)
|