# -*- coding: utf-8 -*- from .__load__ import * import fitz from collections import Counter class Pdf(Base): def json(self): if not self.file: return False self.getPath() doc = fitz.open(self.file) page_count = doc.page_count result = {'total': page_count, 'pages': []} # 预扫,找出重复最多的页眉/页脚内容 header_footer_texts = [] for page_num in range(len(doc)): page_obj = doc.load_page(page_num) page_height = page_obj.rect.height blocks = page_obj.get_text("dict", sort=True)["blocks"] for b in blocks: if b['type'] != 0: continue y_top = b["bbox"][1] y_bottom = b["bbox"][3] text_content = "" for line in b["lines"]: for span in line["spans"]: text_content += span["text"] text_content = text_content.strip() if not text_content: continue # 只考虑页顶和页底 5% if y_top < page_height * 0.05 or y_bottom > page_height * 0.95: header_footer_texts.append(text_content) # 找出最常见的内容(页眉/页脚) common_texts = [text for text, count in Counter(header_footer_texts).items() if count > 1] for page_num in range(len(doc)): page_obj = doc.load_page(page_num) page_height = page_obj.rect.height # 空白页跳过 text = page_obj.get_text().strip() blocks = page_obj.get_text("dict", sort=True)["blocks"] has_visible_content = any( b for b in blocks if b['type'] in (0, 1) ) if not text and not has_visible_content: continue # 封面图 try: pix = page_obj.get_pixmap(matrix=fitz.Matrix(0.3, 0.3)) cover_file = f"{self.param['path']}cover_page_{page_num+1}.png" pix.save(cover_file) if 'host' in self.param and self.param['host']: cover_file = cover_file.replace(Demeter.path + 'runtime/', self.param['host']) except Exception as e: print(f"封面图失败: {e}") cover_file = "" page_items = [] for i, b in enumerate(blocks): y_top = b["bbox"][1] y_bottom = b["bbox"][3] if b['type'] == 0: text_content = "" for line in b["lines"]: for span in line["spans"]: text_content += span["text"] text_content += "\n" text_content = text_content.strip() text_content = self.clean_text(self.removeDomains(text_content)) if not text_content: continue # 排除页眉/页脚内容 if text_content in common_texts: continue page_items.append({ "type": "text", "content": text_content, "pos": b["bbox"] }) elif b['type'] == 1: image_bytes = b.get("image", b"") if not image_bytes or len(image_bytes) < 100: continue image_ext = "png" image_file = f"{self.param['path']}page{page_num+1}_img_{i}.{image_ext}" with open(image_file, "wb") as f: f.write(image_bytes) if 'host' in self.param and self.param['host']: image_file = image_file.replace(Demeter.path + 'runtime/', self.param['host']) page_items.append({ "type": "image", "ext": image_ext, "content": image_file, "pos": b["bbox"] }) result['pages'].append({ "cover": cover_file, "content": page_items }) return result # 提取为langchain的Document格式 def doc(self): if not self.file: return False #loader = PyPDFLoader(self.file, extract_images=False) #return loader.load() doc = fitz.open(self.file) result = {'page': page, 'content': []} for page_num in range(len(doc)): page = doc.load_page(page_num) # 提取文本 text = page.get_text() # 提取图片中的文字 image_texts = [] for img in page.get_images(full=True): xref = img[0] base_image = doc.extract_image(xref) image_bytes = base_image["image"] image = Image.open(io.BytesIO(image_bytes)) #result = Demeter.service('loader', 'extract').get(image) ocr_result = ocr.ocr(image) for line in ocr_result[0]: image_texts.append(line[1]) ''' # OCR 识别 ocr_result = ocr_reader.readtext(image) image_texts = " ".join([line[1] for line in ocr_result]).strip() ''' # 合并文字 + 图片文字 full_text = text.strip() + "\n" + "\n".join(image_texts) document = langchain.schema.Document(page_content=full_text) result['content'].append(document) return result def clean_text(self, s): return re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', s)