1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
|
"""
使用前请先安装:
pip install markdown beautifulsoup4 requests oss2
本工具的功能如下:
1. 读取markdown文件
2. 提取出markdown中的图片链接
3. 如果图片是http开头的网络图片,且域名不是xxx.com,那么下载图片,然后上传到oss的一个bucket中
4. 如果图片是本地图片,直接上传到oss
5. 替换图片链接得到新的markdown文件
Usage: python tool.py markdown1.md markdown2.md
"""
# coding: utf-8
# author: roderick
# date: 2024-01-10
import os
import markdown
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import oss2
import uuid
# 要排除的域名,这些域名下的图片不处理
EXCLUDE_TARGET_DOMAIN = ["img.roderickchan.cn", "xxxxx.oss-cn-hangzhou.aliyuncs.com"]
# 新的访问前缀
NEW_URL_FOR_IMAGE = "https://xxxxx.oss-cn-hangzhou.aliyuncs.com"
# 存储图片的路径的前缀,比如我是存储在img文件夹下面
IMAGE_PREFIX = "img/" # 新的图片的链接为:NEW_URL_FOR_IMAGE/IMAGE_PREFIX/img_filename
# 建议使用子权限账户,只赋予OSS读写权限
OSS_ACCESS_KEY_ID = ""
OSS_ACCESS_KEY_SECRET = ""
OSS_ENDPOINT = "https://oss-cn-hangzhou.aliyuncs.com"
BUCKET_NAME = "xxxxx"
# 连接aliyun的oss
bucket = oss2.Bucket(oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET), OSS_ENDPOINT, BUCKET_NAME)
# 拼接路径,替换掉\
def joinpath(*args, **kwargs):
return os.path.join(*args, **kwargs).replace("\\", "/")
# 每个实例处理一个文件
class MarkdownFileProcessor:
def __init__(self, filepath: str, backup=True) -> None:
self.get_filename = self._outer()
self.markdown_filepath = filepath
self.backup = backup
# 使用"序号-UUID"这样的格式来重命名
def _outer(self):
start = 0
def _inner():
nonlocal start
start += 1
return f"{start}-{uuid.uuid4()}"
return _inner
# 使用get下载图片
def _download_image(self, url, filename):
response = requests.get(url, stream=True)
with open(filename, 'wb') as out_file:
out_file.write(response.content)
print(f"[+++] [{self.markdown_filepath}] Download_image and save: {filename}")
del response
# 把需要上传的图片上传到oss
def _upload_to_oss(self, rename=True, *filenames):
src_dict = {}
for fn in filenames:
if rename: # 文件名
nn = self.get_filename() + os.path.splitext(fn)[1]
else:
nn = os.path.basename(fn)
if not os.path.exists(fn):
src_dict[fn] = fn.replace("\\", "/")
print(f"[!!!] [{self.markdown_filepath}] file {fn} not exist!!!")
continue
try:
bucket.put_object_from_file(joinpath(IMAGE_PREFIX, nn), fn)
src_dict[fn] = joinpath(NEW_URL_FOR_IMAGE, IMAGE_PREFIX, nn)
print(f"[+++] [{self.markdown_filepath}] bucket.put_object_from_file({os.path.join(IMAGE_PREFIX, nn)}, {fn}) done.")
except:
src_dict[fn] = fn.replace("\\", "/")
print(f"[!!!] [{self.markdown_filepath}] bucket.put_object_from_file({os.path.join(IMAGE_PREFIX, nn)}, {fn}) error!!!")
return src_dict
# 处理文件
def _process_markdown(self):
filename = self.markdown_filepath
with open(filename, 'r', encoding='utf-8') as f:
text = f.read()
need_download = []
need_push = []
html = markdown.markdown(text)
soup = BeautifulSoup(html, features="html.parser")
for img in soup.find_all('img'):
src = img.get('src')
parsed_url = urlparse(src)
if parsed_url.scheme in ['http', 'https']:
if parsed_url.netloc not in EXCLUDE_TARGET_DOMAIN:
need_download.append(src)
elif not parsed_url.scheme:
need_push.append(src)
return need_download, need_push # 这两个列表存储的都是完整的url,且都是字符串,前者是http url,后者是本地url
# 处理图片
def _process_img(self, need_download: list, need_push: list):
ori_src = need_download.copy() + need_push.copy()
use_dict = {}
for x in need_download:
fn = os.path.split(x)[1]
self._download_image(x, fn)
use_dict[fn] = x
need_push.append(fn)
new_src = self._upload_to_oss(True, *need_push)
# 替换文本
if self.backup:
with open(self.markdown_filepath, "rb") as f1:
with open(self.markdown_filepath+".bk", "wb") as f2:
f2.write(f1.read())
data = ""
with open(self.markdown_filepath, "rt", encoding='utf-8', errors="ignore") as f:
data = f.read()
# 替换
for k, v in new_src.items():
if k in ori_src:
data = data.replace(k, v)
elif k in use_dict:
data = data.replace(use_dict[k], v)
with open(self.markdown_filepath, "wt", encoding='utf-8', errors="ignore") as f:
f.write(data)
# 调用这个函数
def run(self):
try:
self._process_img(*self._process_markdown())
print(f"[+++] [{self.markdown_filepath}] Process done.")
except Exception as e:
print(f"[!!!] [{self.markdown_filepath}] Process error!!!")
print(e)
if __name__ == "__main__":
import sys
all_files = sys.argv[1:]
assert all(map(lambda x: os.path.exists(x) and os.path.isfile(x) and x.endswith([".md", ".MD"]), all_files)), "请保证文件存在,且以.md结尾!"
for x in sys.argv[1:]:
s = MarkdownFileProcessor(x)
s.run()
|