1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
| import re import requests import os import oss2 from concurrent.futures import ThreadPoolExecutor from datetime import datetime import uuid
def generate_file_name(): date_str = datetime.now().strftime("%Y%m%d%H%M%S") uuid_str = str(uuid.uuid4())[0:5] return date_str + "_" + uuid_str + ".jpg"
def get_pic_url(filename): url_map = {} with open(filename, 'r',) as f: content = f.read() img_patten = r'!\[.*?\]\((.*?)\)|<img.*?src=[\'\"](.*?)[\'\"].*?>' matches = list(re.compile(img_patten).findall(content)) if len(matches) > 0: for url in matches: url = url[0] if "your_pic_domain" in url: continue
print("图片原url:", url)
try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML,' ' like Gecko) Chrome/55.0.2883.87 Safari/537.36', 'referer': 'https://weibo.com/' } response = requests.get(url, headers=headers).content
pic_name = generate_file_name() pic_name = "{}/{}".format(pic_path, pic_name) print("saving image " + url + " to: " + pic_name)
with open(pic_name, 'wb') as f2: f2.write(response) new_pic_url = upload_pic_to_ali_oss(pic_name) if new_pic_url: print("图片新url:", new_pic_url) url_map[url] = new_pic_url else: raise Exception("upload image failed.")
except Exception as e: print("文件:", filename, "url处理失败:", url, e) with open("error_images.text", 'a') as ef: ef.write("文件: " + filename + ", url处理失败: " + url + "\n")
print(url_map) return url_map
def list_file(files, path): items = os.listdir(path) for i in items: i_path = os.path.join(path, i) if os.path.isdir(i_path): list_file(files, i_path) else: if i_path.endswith(".md"): files.append(i_path) return files
def upload_pic_to_ali_oss(file_path): auth = oss2.Auth('token', 'token') bucket = oss2.Bucket(auth, 'http://oss-cn-shanghai.aliyuncs.com', 'your_bucket_name') try: print("uploading " + file_path) bucket_path = 'bucket_path' + os.path.basename(file_path) bucket.put_object_from_file(bucket_path, file_path) return 'https://your_domain' + bucket_path except Exception as e: print('upload image to ali oss failed:', e) return None
def modify_md(filename, url_map): try: with open(filename, "r") as f: content = f.read() for url, new_pic_url in url_map.items(): with open(filename, "w") as f: content = content.replace(url, new_pic_url) f.write(content) except Exception as e: print(filename, '文件修改失败:', e)
def run(file): print("[ " + file + " ]" + ": download images and upload to qiniu.") url_map = get_pic_url(file) if len(url_map.keys()) > 0: print("[ " + file + " ]" + ": replace images.") modify_md(file, url_map)
def main(path): files = list_file([], path) if len(files) > 0: th_pool = ThreadPoolExecutor(4) for file in files: th_pool.submit(run, file) th_pool.shutdown(wait=True) else: print("no markdown found, exit")
if __name__ == "__main__": md_path = "./md" pic_path = "./pic" if not os.path.exists(pic_path): os.makedirs(pic_path) main(md_path)
|