Create Python API (#457)

* 更新镜像

* 更新镜像信息

* 更新镜像信息

* Create openai_api.py

* Create requirements.txt

* Create README.md

* 添加python接口

* Delete python directory

* Create README.md

* Create Python API

* 文件结构化

* 文件结构化
This commit is contained in:
不做了睡大觉
2023-11-09 11:52:53 +08:00
committed by GitHub
parent 8bb5588305
commit 9f889d8806
9 changed files with 467 additions and 0 deletions

View File

@@ -0,0 +1,90 @@
import requests
import bs4
import nltk
from urllib.parse import urljoin
from time import sleep
import time
import math
# 全局变量来记录开始时间
start_time = time.time()
# 你可以设定一个最大运行时长比如60秒
max_run_time = 20
# 添加一个简单的IDF计算器
class SimpleIDFCalculator:
def __init__(self):
self.doc_freq = {}
self.num_docs = 0
def add_document(self, doc):
self.num_docs += 1
words = set(nltk.word_tokenize(doc))
for word in words:
if word in self.doc_freq:
self.doc_freq[word] += 1
else:
self.doc_freq[word] = 1
def idf(self, word):
return math.log(self.num_docs / (1 + self.doc_freq.get(word, 0)))
# 定义一个函数,用于获取网页的内容,并进行总结
def get_summary(url, level):
result = []
visited = set()
idf_calculator = SimpleIDFCalculator()
helper(url, level, result, visited, idf_calculator)
return result
# 辅助函数
def helper(url, level, result, visited, idf_calculator):
# # 检查是否超出运行时间限制
# if time.time() - start_time > max_run_time:
# print("Reached max run time, exiting...")
# return
if level == 0 or url in visited or not url.startswith("http"):
return
visited.add(url)
try:
response = requests.get(url)
if response.status_code != 200:
return
soup = bs4.BeautifulSoup(response.text, "html.parser")
title = soup.title.string if soup.title else 'No Title'
text = soup.get_text().strip()
idf_calculator.add_document(text)
sentences = nltk.sent_tokenize(text)
words = nltk.word_tokenize(text)
scores = {}
for sentence in sentences:
for word in nltk.word_tokenize(sentence):
tf = words.count(word) / len(words)
idf = idf_calculator.idf(word)
scores[sentence] = scores.get(sentence, 0) + (tf * idf)
summary = " ".join(sorted(scores, key=scores.get, reverse=True)[:10])
result.append((url, title, summary))
sleep(1) # Simple delay to prevent aggressive crawling
links = soup.find_all("a")
for link in links:
href = link.get("href")
if href:
# Handle relative links
next_url = urljoin(url, href)
helper(next_url, level - 1, result, visited, idf_calculator)
except Exception as e:
print(f"Error processing {url}: {e}")
# # 主程序部分,仅作为函数调用示例:
# summary = get_summary('https://zhihu.com', 2)
# print(summary)

View File

@@ -0,0 +1,93 @@
import os
import docx
from aip import AipOcr
from io import BytesIO
from PyPDF2 import PdfReader
from pdf2image import convert_from_path
# 百度OCR API设置
APP_ID = os.environ.get('APP_ID','xxx')
API_KEY = os.environ.get('API_KEY','xxx')
SECRET_KEY = os.environ.get('SECRET_KEY','xxx')
client = AipOcr(APP_ID, API_KEY, SECRET_KEY)
def ocr_image(image_data):
result = client.basicGeneral(image_data)
text = ''
if 'words_result' in result:
for item in result['words_result']:
text += item['words'] + '\n'
return text
def process_pdf(file_path):
pdf = PdfReader(file_path)
num_pages = len(pdf.pages)
text = ''
for page_num in range(num_pages):
page = pdf.pages[page_num]
text += f'--------------------------------------------\n'
text += f'文档名:{os.path.basename(file_path)}\n'
text += f'页数:{page_num + 1}\n'
text += f'该页内容:\n'
text += page.extract_text() + '\n'
images = convert_from_path(file_path, first_page=page_num + 1, last_page=page_num + 1)
for image in images:
image_data = BytesIO()
image.save(image_data, format='PNG')
image_data = image_data.getvalue()
ocr_text = ocr_image(image_data)
if ocr_text:
text += f'图片文字:\n'
text += ocr_text + '\n'
text += '--------------------------------------------\n'
return text
def process_doc(file_path):
doc = docx.Document(file_path)
text = ''
page_num = 1
for paragraph in doc.paragraphs:
if paragraph.text.strip() == '': # 简单地将空行视为分页符
page_num += 1
else:
text += f'--------------------------------------------\n'
text += f'文档名:{os.path.basename(file_path)}\n'
text += f'页数:{page_num}\n'
text += f'该页内容:\n'
text += paragraph.text + '\n'
for shape in doc.inline_shapes:
if shape.type == docx.enum.shape.WD_INLINE_SHAPE.PICTURE:
blip_id = shape._inline.graphic.graphicData.pic.blipFill.blip.embed
image_part = doc.part.related_parts[blip_id]
image_data = image_part.blob
ocr_text = ocr_image(image_data)
if ocr_text:
text += f'图片文字:\n'
text += ocr_text + '\n'
return text
def process_txt(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
text = f.read()
return text
def office_to_txt(file_path):
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext == '.docx':
return process_doc(file_path)
elif file_ext == '.pdf':
return process_pdf(file_path)
elif file_ext == '.doc':
return process_doc(file_path)
elif file_ext == '.txt':
return process_txt(file_path)
else:
raise ValueError('Unsupported file format')