From db3322621a714f429621d6a315936e8e3bcff28d Mon Sep 17 00:00:00 2001 From: Weibin Huang <654751191@qq.com> Date: Wed, 7 Jun 2023 09:23:30 +0800 Subject: [PATCH] v2.5.7 + Optimize myblog.py and make it more readable + Create up() to integerate both REST API mode and Password mode + Repair some bugs about failure retries. --- m2w/__init__.py | 3 +- m2w/up.py | 357 +++++++++++++-------------------------------- m2w/up_password.py | 277 +++++++++++++++++++++++++++++++++++ myblog.py | 120 +++------------ setup.py | 6 +- 5 files changed, 408 insertions(+), 355 deletions(-) create mode 100644 m2w/up_password.py diff --git a/m2w/__init__.py b/m2w/__init__.py index 874fcd9..f2f24bb 100644 --- a/m2w/__init__.py +++ b/m2w/__init__.py @@ -1,4 +1,5 @@ from .json2 import read_json_as_dict from .md5 import get_file_md5 -from .up import up, md_detect +from .up_password import md_detect +from .up import up from .wp import wp_xmlrpc diff --git a/m2w/up.py b/m2w/up.py index 3fbb587..8904692 100644 --- a/m2w/up.py +++ b/m2w/up.py @@ -1,277 +1,130 @@ -# -*- coding: utf-8 -*- -# @Time : 2022/12/03 16:42 -# @Author : huangwb8 -# @File : up.py -# @Function: m2w project -# @Software: VSCode -# @Reference: original -import m2w.update -import m2w.upload -from m2w.md5 import get_file_md5 -from m2w.json2 import save_dict_as_json -from m2w.json2 import read_json_as_dict +from m2w.rest_api import RestApi +from m2w.up_password import md_detect, up_password +from m2w.wp import wp_xmlrpc import sys import os - - -####===============================Functions -def find_files(path, suffix=".md"): - """ - ### Description - Find all files with specifed suffix in the path - - ### Parameters - path: String. The path of files. Allow one or more paths. - suffix: String. The suffix of target files. - - ### Return - List. Paths of target files. - - ### Reference - https://www.cnblogs.com/2bjiujiu/p/7255599.html - https://www.cnblogs.com/CGRun/p/16309265.html - - """ - # Gather results - result = [] - - # Use a sub function - def ff(path, suffix=".md"): - file_list = os.listdir(path) - for file in file_list: - cur_path = os.path.join(path, file) - # print(cur_path) - if os.path.isdir(cur_path): - ff(cur_path, suffix) - else: - if cur_path.endswith(suffix): - result.append(cur_path) - - # Output data - ff(path, suffix=".md") - return result - - -def md_detect(path_markdown, path_legacy_json, verbose=True): +import shutil + +async def up( + path_markdown, + path_legacy_json, + domain, username, password, application_password, post_metadata, + last_update_time_change = False, force_upload=False, verbose=True, rest_api=True, max_retries = 10 + ): + """ ### Description - Gather paths of brand-new and changed legacy markdown files. + Upload or update markdown files to your WordPress site. ### Parameters - + path_markdown: String. The path of markdown files. Allow one or more paths. - + path_legacy_json: String. The path of the 'legacy.json' file. - + verbose: Boolean. Whether output running messages of the function. + + path_markdown: The path of markdown files + + path_legacy_json: The path of legacy*.json + + domain, username, password, application_password, post_metadata: The data of a WordPress website + + last_update_time_change: Boolean. Whether to update the last update time of the post. Only work in REST API mode + + force_upload: Boolean. Whether check the existence of a new post before uploading. Default is False, which means that every new post would receive checking + + verbose: Boolean. Whether output running messages of the function + + rest_api: Whether to use REST API mode + + max_retries: Integer. The max retry time when meeting failure ### Return - Dict. With two keys———"legacy" and "new". - + The "new" means the brand-new markdown files in the "path_markdown" dir. - + The "legacy" means the changed legacy markdown files in the "path_markdown" dir. + None """ - # Test whether the path_markdown has existed - if len(path_markdown) == 0: - if verbose: - print('No path about markdown files. Please assign at least one.') - sys.exit(0) + # Backup legacy*.json + if os.path.exists(path_legacy_json): + shutil.copyfile(path_legacy_json, path_legacy_json + "_temporary-copy") - # Test whether the legacy_json has existed - if not os.path.isfile(path_legacy_json): - # Warning - if verbose: - print( - 'No legacy json. All markdown files would be treated as brand-new ones.' - ) - - # Gather new markdown files - new = [] - for path in path_markdown: - new = new + find_files(path, suffix=".md") - new = sorted( - set(new), key=new.index - ) # Keep unique elements. Ref: https://blog.csdn.net/u011361880/article/details/76237096 - - # md5 sum of new files - dict = {} - for i in new: - dict[i] = get_file_md5(i) - save_dict_as_json(dict, path_legacy_json) - if verbose: - print('Create legacy json for new markdowns!') + # Upload & Update + if rest_api: + # REST API Mode - # Output - result = {"new": new, "legacy": []} # - return result - else: if verbose: - print('With legacy.json. Confirm new or changed legacy markdown files.') - - # all files - all = [] - for path in path_markdown: - all = all + find_files(path, suffix=".md") - - # Compare changes in markdown files - md5_legacy = read_json_as_dict(path_legacy_json) - md5_all = {} - for i in all: - md5_all[i] = get_file_md5(i) - save_dict_as_json(md5_all, path_legacy_json) # Update legacy json - - # Confirm new files - new = set(md5_all.keys()).difference(set(md5_legacy.keys())) - if len(new) >= 1: - for j in new: - if verbose: - print('New content! ' + j) - md5_legacy[j] = get_file_md5(j) + print("(ฅ´ω`ฅ) REST API Mode. Very safe!") + rest = RestApi( + url=domain, wp_username=username, wp_password=application_password + ) + + # Gather paths of brand-new and changed legacy markdown files + res = md_detect(path_markdown, path_legacy_json, verbose=verbose) + md_upload = res["new"] + md_update = res["legacy"] + + if len(md_upload) > 0 or len(md_update) > 0: + # Use REST API mode to upload/update articles + for retry in range(max_retries): + try: + await rest.upload_article( + md_message=res, + post_metadata=post_metadata, + verbose=verbose, + force_upload=force_upload, + last_update=last_update_time_change, + ) + if os.path.exists(path_legacy_json + "_temporary-copy"): + os.remove(path_legacy_json + "_temporary-copy") + break + except Exception as e: + print("OOPS, the REST API mode failed!") + if os.path.exists(path_legacy_json + "_temporary-copy"): + os.remove(path_legacy_json) + os.rename( + path_legacy_json + "_temporary-copy", path_legacy_json + ) + if retry < max_retries - 1: + print("Retrying...") + continue + else: + print("Maximum retries exceeded. Exiting.") + sys.exit(0) else: if verbose: - print('No new markdown files. Ignored.') - # print(new) - # print(list(md5_all.keys())) - # print(list(md5_legacy.keys())) - - # Confirm changed legacy files - md5_filter = md5_all - intersect_key = set(sorted(md5_all.keys())) & set(sorted(md5_legacy.keys())) - for i in intersect_key: - if md5_legacy[i] == md5_all[i]: - md5_filter.pop(i) - else: - if verbose: - print('Content changed!: ', i) - if len(md5_filter) == 0: - if verbose: - print('No changed legacy markdown files. Ignored.') - - # Output - result = {"new": list(new), "legacy": list(md5_filter.keys())} # - return result - - -def up(client, md_upload, md_update, post_metadata, force_upload=False, verbose=True): - """ - ### Description - Upload or update markdown files to your WordPress site. - - ### Parameters - + client: The return of m2w.wp.wp_xmlrpc. - + md_upload: String. The path of new markdown files. - + md_upload: String. The path of changed legacy markdown files. - + post_metadata: Dict. The metadata of a post. - + force_upload: Boolean. Whether check the existence of a new post before uploading. Default is False, which means that every new post would receive checking. - + verbose: Boolean. Whether output running messages of the function. + print("Without any new or changed legacy markdown files. Ignored.") + else: + # Legacy Password Mode - ### Return - None - """ + if verbose: + print("Σ( ° △ °|||)︴Legacy Password Mode. Not safe!") - # Assistant function for uploading - def upload_one_post( - client, filepath, post_metadata, all_cnt, md_cnt, process_number, verbose - ): - post = m2w.upload.make_post(filepath, post_metadata) # Upload the new markdown - process_number2 = process_number + 1 - if post is not None: - m2w.upload.push_post(post, client) - if verbose: - md_cnt2 = md_cnt + 1 # Record an uploading event - print( - 'Process number: %d/%d SUCCESS: Push "%s"' - % (process_number2, all_cnt, filepath) - ) - else: - failpaths.append(filepath) - if verbose: - print( - 'Process number: %d/%d WARNING: Can\'t push "%s" because it\'s not Markdown file.' - % (process_number2, all_cnt, filepath) - ) - # Output a tuple - return md_cnt2, process_number2 + # Parameters + client = wp_xmlrpc(domain, username, password) - # Information about force uploading - if force_upload == False: - if verbose: - print( - "You don't want a force uploading. The existence of the post would be checked." - ) - else: - if verbose: - print("You want a force uploading? Great!") + # Gather paths of brand-new and changed legacy markdown files + res = md_detect(path_markdown, path_legacy_json, verbose=verbose) + md_upload = res["new"] + md_update = res["legacy"] - # Upload new markdown files - if len(md_upload) > 0: - md_cnt = 0 - process_number = 0 - all_cnt = len(md_upload) # Count parameters - failpaths = [] # Store failed uploaded markdown files - for filepath in md_upload: - if force_upload == False: - post_wp = m2w.update.find_post( - filepath, client - ) # Test whether this file had been existed in the WordPress site - if post_wp is not None: - if verbose: - print( - 'Warning: This post is existed in your WordPress site. Ignore uploading!' - ) - else: - if verbose: - print( - 'This post is exactly a new one in your WordPress site! Try uploading...' - ) - res = upload_one_post( + # Use Password mode to upload/update articles + if len(md_upload) > 0 or len(md_update) > 0: + for retry in range(max_retries): + try: + up_password( client, - filepath, + md_upload, + md_update, post_metadata, - all_cnt, - md_cnt, - process_number, - verbose, + force_upload=force_upload, + verbose=verbose, ) - md_cnt = +res[0] - process_number = +res[1] - else: - res = upload_one_post( - client, - filepath, - post_metadata, - all_cnt, - md_cnt, - process_number, - verbose, - ) - md_cnt = +res[0] - process_number = +res[1] - - if verbose: - print('SUCCESS: %d files have been pushed to your WordPress.' % md_cnt) - if len(failpaths) > 0: - print( - 'WARNING: %d files haven\'t been pushed to your WordPress.' - % len(failpaths) - ) - print('\nFailure to push these file paths:') - for failpath in failpaths: - print(failpath) + if os.path.exists(path_legacy_json + "_temporary-copy"): + os.remove(path_legacy_json + "_temporary-copy") + break + except Exception as e: + print("OOPS, the Password mode failed!") + if os.path.exists(path_legacy_json + "_temporary-copy"): + os.remove(path_legacy_json) + os.rename( + path_legacy_json + "_temporary-copy", path_legacy_json + ) + if retry < max_retries - 1: + print("Retrying...") + continue + else: + print("Maximum retries exceeded. Exiting.") + sys.exit(0) + + else: + if verbose: + print("Without any new or changed legacy markdown files. Ignored.") - # Update changed legacy markdown files - if len(md_update) > 0: - for filepath in md_update: - post = m2w.update.find_post(filepath, client) - if post is not None: - ret = m2w.update.update_post_content(post, filepath, client) - if ret: - if verbose: - print('SUCCESS: Update the file "%s"' % filepath) - else: - if verbose: - print('FAILURE: Update the file "%s"' % filepath) - else: - if verbose: - print( - 'FAILURE to find the post. Please check your User Configuration and the title in your WordPress.' - ) diff --git a/m2w/up_password.py b/m2w/up_password.py new file mode 100644 index 0000000..51f626f --- /dev/null +++ b/m2w/up_password.py @@ -0,0 +1,277 @@ +# -*- coding: utf-8 -*- +# @Time : 2022/12/03 16:42 +# @Author : huangwb8 +# @File : up.py +# @Function: m2w project +# @Software: VSCode +# @Reference: original + + +import m2w.update +import m2w.upload +from m2w.md5 import get_file_md5 +from m2w.json2 import save_dict_as_json +from m2w.json2 import read_json_as_dict +import sys +import os + + +####===============================Functions +def find_files(path, suffix=".md"): + """ + ### Description + Find all files with specifed suffix in the path + + ### Parameters + path: String. The path of files. Allow one or more paths. + suffix: String. The suffix of target files. + + ### Return + List. Paths of target files. + + ### Reference + https://www.cnblogs.com/2bjiujiu/p/7255599.html + https://www.cnblogs.com/CGRun/p/16309265.html + + """ + # Gather results + result = [] + + # Use a sub function + def ff(path, suffix=".md"): + file_list = os.listdir(path) + for file in file_list: + cur_path = os.path.join(path, file) + # print(cur_path) + if os.path.isdir(cur_path): + ff(cur_path, suffix) + else: + if cur_path.endswith(suffix): + result.append(cur_path) + + # Output data + ff(path, suffix=".md") + return result + + +def md_detect(path_markdown, path_legacy_json, verbose=True): + """ + ### Description + Gather paths of brand-new and changed legacy markdown files. + + ### Parameters + + path_markdown: String. The path of markdown files. Allow one or more paths. + + path_legacy_json: String. The path of the 'legacy.json' file. + + verbose: Boolean. Whether output running messages of the function. + + ### Return + Dict. With two keys———"legacy" and "new". + + The "new" means the brand-new markdown files in the "path_markdown" dir. + + The "legacy" means the changed legacy markdown files in the "path_markdown" dir. + """ + + # Test whether the path_markdown has existed + if len(path_markdown) == 0: + if verbose: + print('No path about markdown files. Please assign at least one.') + sys.exit(0) + + # Test whether the legacy_json has existed + if not os.path.isfile(path_legacy_json): + # Warning + if verbose: + print( + 'No legacy json. All markdown files would be treated as brand-new ones.' + ) + + # Gather new markdown files + new = [] + for path in path_markdown: + new = new + find_files(path, suffix=".md") + new = sorted( + set(new), key=new.index + ) # Keep unique elements. Ref: https://blog.csdn.net/u011361880/article/details/76237096 + + # md5 sum of new files + dict = {} + for i in new: + dict[i] = get_file_md5(i) + save_dict_as_json(dict, path_legacy_json) + if verbose: + print('Create legacy json for new markdowns!') + + # Output + result = {"new": new, "legacy": []} # + return result + else: + if verbose: + print('With legacy.json. Confirm new or changed legacy markdown files.') + + # all files + all = [] + for path in path_markdown: + all = all + find_files(path, suffix=".md") + + # Compare changes in markdown files + md5_legacy = read_json_as_dict(path_legacy_json) + md5_all = {} + for i in all: + md5_all[i] = get_file_md5(i) + save_dict_as_json(md5_all, path_legacy_json) # Update legacy json + + # Confirm new files + new = set(md5_all.keys()).difference(set(md5_legacy.keys())) + if len(new) >= 1: + for j in new: + if verbose: + print('New content! ' + j) + md5_legacy[j] = get_file_md5(j) + else: + if verbose: + print('No new markdown files. Ignored.') + # print(new) + # print(list(md5_all.keys())) + # print(list(md5_legacy.keys())) + + # Confirm changed legacy files + md5_filter = md5_all + intersect_key = set(sorted(md5_all.keys())) & set(sorted(md5_legacy.keys())) + for i in intersect_key: + if md5_legacy[i] == md5_all[i]: + md5_filter.pop(i) + else: + if verbose: + print('Content changed!: ', i) + if len(md5_filter) == 0: + if verbose: + print('No changed legacy markdown files. Ignored.') + + # Output + result = {"new": list(new), "legacy": list(md5_filter.keys())} # + return result + + +def up_password(client, md_upload, md_update, post_metadata, force_upload=False, verbose=True): + """ + ### Description + Upload or update markdown files to your WordPress site. + + ### Parameters + + client: The return of m2w.wp.wp_xmlrpc. + + md_upload: String. The path of new markdown files. + + md_upload: String. The path of changed legacy markdown files. + + post_metadata: Dict. The metadata of a post. + + force_upload: Boolean. Whether check the existence of a new post before uploading. Default is False, which means that every new post would receive checking. + + verbose: Boolean. Whether output running messages of the function. + + ### Return + None + """ + + # Assistant function for uploading + def upload_one_post( + client, filepath, post_metadata, all_cnt, md_cnt, process_number, verbose + ): + post = m2w.upload.make_post(filepath, post_metadata) # Upload the new markdown + process_number2 = process_number + 1 + if post is not None: + m2w.upload.push_post(post, client) + if verbose: + md_cnt2 = md_cnt + 1 # Record an uploading event + print( + 'Process number: %d/%d SUCCESS: Push "%s"' + % (process_number2, all_cnt, filepath) + ) + else: + failpaths.append(filepath) + if verbose: + print( + 'Process number: %d/%d WARNING: Can\'t push "%s" because it\'s not Markdown file.' + % (process_number2, all_cnt, filepath) + ) + # Output a tuple + return md_cnt2, process_number2 + + # Information about force uploading + if force_upload == False: + if verbose: + print( + "You don't want a force uploading. The existence of the post would be checked." + ) + else: + if verbose: + print("You want a force uploading? Great!") + + # Upload new markdown files + if len(md_upload) > 0: + md_cnt = 0 + process_number = 0 + all_cnt = len(md_upload) # Count parameters + failpaths = [] # Store failed uploaded markdown files + for filepath in md_upload: + if force_upload == False: + post_wp = m2w.update.find_post( + filepath, client + ) # Test whether this file had been existed in the WordPress site + if post_wp is not None: + if verbose: + print( + 'Warning: This post is existed in your WordPress site. Ignore uploading!' + ) + else: + if verbose: + print( + 'This post is exactly a new one in your WordPress site! Try uploading...' + ) + res = upload_one_post( + client, + filepath, + post_metadata, + all_cnt, + md_cnt, + process_number, + verbose, + ) + md_cnt = +res[0] + process_number = +res[1] + else: + res = upload_one_post( + client, + filepath, + post_metadata, + all_cnt, + md_cnt, + process_number, + verbose, + ) + md_cnt = +res[0] + process_number = +res[1] + + if verbose: + print('SUCCESS: %d files have been pushed to your WordPress.' % md_cnt) + if len(failpaths) > 0: + print( + 'WARNING: %d files haven\'t been pushed to your WordPress.' + % len(failpaths) + ) + print('\nFailure to push these file paths:') + for failpath in failpaths: + print(failpath) + + # Update changed legacy markdown files + if len(md_update) > 0: + for filepath in md_update: + post = m2w.update.find_post(filepath, client) + if post is not None: + ret = m2w.update.update_post_content(post, filepath, client) + if ret: + if verbose: + print('SUCCESS: Update the file "%s"' % filepath) + else: + if verbose: + print('FAILURE: Update the file "%s"' % filepath) + else: + if verbose: + print( + 'FAILURE to find the post. Please check your User Configuration and the title in your WordPress.' + ) diff --git a/myblog.py b/myblog.py index 3446470..a1ad536 100644 --- a/myblog.py +++ b/myblog.py @@ -7,11 +7,8 @@ # @Reference: original # ===============================Dependency -from m2w.rest_api import RestApi -from m2w import read_json_as_dict, md_detect, up, wp_xmlrpc +from m2w import read_json_as_dict, up import sys -import shutil -import os.path import asyncio import time @@ -64,115 +61,40 @@ async def main(): elif not use_rest_api and "password" in website: rest_api = False application_password = None + password = website["password"] elif use_rest_api and "password" in website: print( "Warning: You have REST API. Password would be ignored. You can remove password in the 'user.json' to make the use of m2w safer!" ) rest_api = True application_password = website["application_password"] + password = None else: rest_api = True application_password = website["application_password"] + password = None # Connect the WordPress website print("========Website: " + i) - - # Backup legacy*.json - if os.path.exists(path_legacy_json): - shutil.copyfile(path_legacy_json, path_legacy_json + "_temporary-copy") - - # Upload & Update - if rest_api: - # REST API Mode - - if verbose: - print("(ฅ´ω`ฅ) REST API Mode. Very safe!") - rest = RestApi( - url=domain, wp_username=username, wp_password=application_password - ) - - # Gather paths of brand-new and changed legacy markdown files - res = md_detect(path_markdown, path_legacy_json, verbose=verbose) - md_upload = res["new"] - md_update = res["legacy"] - - if len(md_upload) > 0 or len(md_update) > 0: - # Use REST API mode to upload/update articles - for retry in range(max_retries): - try: - await rest.upload_article( - md_message=res, - post_metadata=post_metadata, - verbose=verbose, - force_upload=force_upload, - last_update=last_update_time_change, - ) - if os.path.exists(path_legacy_json + "_temporary-copy"): - os.remove(path_legacy_json + "_temporary-copy") - break - except Exception as e: - print("OOPS, the REST API mode failed!") - if os.path.exists(path_legacy_json + "_temporary-copy"): - os.remove(path_legacy_json) - os.rename( - path_legacy_json + "_temporary-copy", path_legacy_json - ) - if retry < max_retries - 1: - print("Retrying...") - continue - else: - print("Maximum retries exceeded. Exiting.") - sys.exit(0) - else: - if verbose: - print("Without any new or changed legacy markdown files. Ignored.") - else: - # Legacy Password Mode - - if verbose: - print("Σ( ° △ °|||)︴Legacy Password Mode. Not safe!") + await up( + # The path of files + path_markdown = path_markdown, + path_legacy_json = path_legacy_json, + + # Website data + domain = domain, + username = username, + password = password, + application_password = application_password, + post_metadata = post_metadata, # Parameters - password = website["password"] - client = wp_xmlrpc(domain, username, password) - - # Gather paths of brand-new and changed legacy markdown files - res = md_detect(path_markdown, path_legacy_json, verbose=verbose) - md_upload = res["new"] - md_update = res["legacy"] - - # Use Password mode to upload/update articles - if len(md_upload) > 0 or len(md_update) > 0: - for retry in range(max_retries): - try: - up( - client, - md_upload, - md_update, - post_metadata, - force_upload=force_upload, - verbose=verbose, - ) - if os.path.exists(path_legacy_json + "_temporary-copy"): - os.remove(path_legacy_json + "_temporary-copy") - break - except Exception as e: - print("OOPS, the Password mode failed!") - if os.path.exists(path_legacy_json + "_temporary-copy"): - os.remove(path_legacy_json) - os.rename( - path_legacy_json + "_temporary-copy", path_legacy_json - ) - if retry < max_retries - 1: - print("Retrying...") - continue - else: - print("Maximum retries exceeded. Exiting.") - sys.exit(0) - - else: - if verbose: - print("Without any new or changed legacy markdown files. Ignored.") + last_update_time_change = last_update_time_change, + force_upload = force_upload, + verbose = verbose, + rest_api = rest_api, + max_retries = max_retries + ) if __name__ == "__main__": diff --git a/setup.py b/setup.py index 9935a00..059d30e 100644 --- a/setup.py +++ b/setup.py @@ -19,11 +19,11 @@ + 输入"pip install twine"来安装twine工具。 必要时可更新工具包:pip install --upgrade twine setuptools wheel -+ 使用"python setup.py sdist"命令来生成项目的源代码包。 如果要测试该包,可运行类似命令: python setup.py sdist; pip install .\dist\m2w-2.5.6.tar.gz ++ 使用"python setup.py sdist"命令来生成项目的源代码包。 如果要测试该包,可运行类似命令: python setup.py sdist; pip install .\dist\m2w-2.5.7.tar.gz + 使用"python setup.py bdist_wheel"命令来生成项目的长描述。 -+ 输入"twine upload dist/* --verbose"来上传项目的源代码包。 ++ 输入"twine upload dist/*2.5.7* --verbose"来上传项目的源代码包。 + 在上传过程中,你需要输入你在PyPi上注册的用户名和密码。 @@ -45,7 +45,7 @@ """ -VERSION = "2.5.6" +VERSION = "2.5.7" with open("README.md", "r", encoding="utf-8") as fh: long_description = fh.read()