使用python批量处理markdown文件中的图片

记录一个小工具,用于批量处理 markdown 文件中的图片,将其下载下来,然后上传到 aliyun OSS 存储中。

使用场景

我目前遇到过四个场景。

场景一

前段时间在使用语雀的时候,把语雀笔记导出为 markdown 后,发现导出后的 markdown 文件中的图片链接指向的是语雀的 CDN,也就是说,这些图片还是存储在语雀的服务器上的。博客的资源当然是存储一份到自己的本地是最安全的,所以我需要把这些图片下载到本地,然后上传到我自己的 oss 存储中。

场景二

写博客的时候,先把图片存储在本地,然后配合 Picgo-Core 使用 Typora 的图片上传。但是,这个工具的批量上传有一些 bug,图片链接的顺序是乱的。图片少的话还好,图片比较多的话,那真的将是灾难。

场景三

在场景一的基础上,把下载下来的 markdown 又添加了一些新的内容,就导致了有一些图片是存储在别的服务器上,有一些图片是存储在本地。但是,我最后都需要把这些图片上传到 oss 中。

场景四

把存储在 A 服务器上的图片全部下载后上传到 B 服务器,然后替换图片的 url

工具脚本

工具的处理逻辑如下:

  1. 读取 markdown 文件
  2. 提取出 markdown 中的图片链接
  3. 如果图片是 http 开头的网络图片,且域名不是 xxx.com,那么下载图片,然后上传到 oss 的一个 bucket
  4. 如果图片是本地图片,直接上传到 oss
  5. 替换图片链接得到新的 md 文件

工具的脚本如下:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""
使用前请先安装:
pip install markdown beautifulsoup4 requests oss2

本工具的功能如下:

1. 读取markdown文件
2. 提取出markdown中的图片链接
3. 如果图片是http开头的网络图片,且域名不是xxx.com,那么下载图片,然后上传到oss的一个bucket中
4. 如果图片是本地图片,直接上传到oss
5. 替换图片链接得到新的markdown文件


Usage: python tool.py markdown1.md markdown2.md
"""
# coding: utf-8
# author: roderick
# date: 2024-01-10

import os
import markdown
import requests
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import oss2
import uuid

# 要排除的域名,这些域名下的图片不处理
EXCLUDE_TARGET_DOMAIN = ["img.roderickchan.cn", "xxxxx.oss-cn-hangzhou.aliyuncs.com"]

# 新的访问前缀
NEW_URL_FOR_IMAGE = "https://xxxxx.oss-cn-hangzhou.aliyuncs.com"

# 存储图片的路径的前缀,比如我是存储在img文件夹下面
IMAGE_PREFIX = "img/" # 新的图片的链接为:NEW_URL_FOR_IMAGE/IMAGE_PREFIX/img_filename

# 建议使用子权限账户,只赋予OSS读写权限
OSS_ACCESS_KEY_ID = ""
OSS_ACCESS_KEY_SECRET = ""
OSS_ENDPOINT = "https://oss-cn-hangzhou.aliyuncs.com"
BUCKET_NAME = "xxxxx"

# 连接aliyun的oss
bucket = oss2.Bucket(oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET), OSS_ENDPOINT, BUCKET_NAME)

# 拼接路径,替换掉\
def joinpath(*args, **kwargs):
    return os.path.join(*args, **kwargs).replace("\\", "/")
    

# 每个实例处理一个文件
class MarkdownFileProcessor:
    def __init__(self, filepath: str, backup=True) -> None:
        self.get_filename = self._outer()
        self.markdown_filepath = filepath
        self.backup = backup

    # 使用"序号-UUID"这样的格式来重命名
    def _outer(self):
        start = 0
        def _inner():
            nonlocal start
            start += 1
            return f"{start}-{uuid.uuid4()}"
        return _inner

    # 使用get下载图片
    def _download_image(self, url, filename):
        response = requests.get(url, stream=True)
        with open(filename, 'wb') as out_file:
            out_file.write(response.content)
        print(f"[+++] [{self.markdown_filepath}] Download_image and save: {filename}")
        del response

    # 把需要上传的图片上传到oss
    def _upload_to_oss(self, rename=True, *filenames):
        src_dict = {}
        for fn in filenames:
            if rename: # 文件名
                nn = self.get_filename() + os.path.splitext(fn)[1]
            else:
                nn = os.path.basename(fn)
                
            if not os.path.exists(fn):
                src_dict[fn] = fn.replace("\\", "/")
                print(f"[!!!] [{self.markdown_filepath}] file {fn} not exist!!!")
                continue
            try:

                bucket.put_object_from_file(joinpath(IMAGE_PREFIX, nn), fn)
                src_dict[fn] = joinpath(NEW_URL_FOR_IMAGE, IMAGE_PREFIX, nn)
                print(f"[+++] [{self.markdown_filepath}] bucket.put_object_from_file({os.path.join(IMAGE_PREFIX, nn)}, {fn}) done.")
            except:
                src_dict[fn] = fn.replace("\\", "/")
                print(f"[!!!] [{self.markdown_filepath}] bucket.put_object_from_file({os.path.join(IMAGE_PREFIX, nn)}, {fn}) error!!!")
        return src_dict

    # 处理文件
    def _process_markdown(self):
        filename = self.markdown_filepath
        with open(filename, 'r', encoding='utf-8') as f:
            text = f.read()
            
        need_download = []
        need_push = []
        html = markdown.markdown(text)
        soup = BeautifulSoup(html, features="html.parser")
        for img in soup.find_all('img'):
            src = img.get('src')
            parsed_url = urlparse(src)
            if parsed_url.scheme in ['http', 'https']:
                if parsed_url.netloc not in EXCLUDE_TARGET_DOMAIN:
                    need_download.append(src)
            elif not parsed_url.scheme:
                need_push.append(src)
        
        return need_download, need_push # 这两个列表存储的都是完整的url,且都是字符串,前者是http url,后者是本地url

    # 处理图片
    def _process_img(self, need_download: list, need_push: list):
        ori_src = need_download.copy() + need_push.copy()
        use_dict = {}
        for x in need_download:
            fn = os.path.split(x)[1]
            self._download_image(x, fn)
            use_dict[fn] = x
            need_push.append(fn)
        
        new_src = self._upload_to_oss(True, *need_push)
        
        # 替换文本
        if self.backup:
            with open(self.markdown_filepath, "rb") as f1:
                with open(self.markdown_filepath+".bk", "wb") as f2:
                    f2.write(f1.read())

        data = ""
        with open(self.markdown_filepath, "rt", encoding='utf-8', errors="ignore") as f:
            data = f.read()

        # 替换
        for k, v in new_src.items():
            if k in ori_src:
                data = data.replace(k, v)
            elif k in use_dict:
                data = data.replace(use_dict[k], v)
        
        with open(self.markdown_filepath, "wt", encoding='utf-8', errors="ignore") as f:
            f.write(data)

    # 调用这个函数
    def run(self):
        try:
            self._process_img(*self._process_markdown())
            print(f"[+++] [{self.markdown_filepath}] Process done.")
        except Exception as e:
            print(f"[!!!] [{self.markdown_filepath}] Process error!!!")
            print(e)
        

if __name__ == "__main__":
    import sys
    all_files = sys.argv[1:]
    assert all(map(lambda x: os.path.exists(x) and os.path.isfile(x) and x.endswith([".md", ".MD"]), all_files)), "请保证文件存在,且以.md结尾!"
    for x in sys.argv[1:]:
        s = MarkdownFileProcessor(x)
        s.run()

目前只实现了最简单的功能,比如图片下载失败的话就会结束处理,后续可以针对该脚本进一步优化。

Buy me a coffee~
roderick 支付宝支付宝
roderick 微信微信
0%