-
Notifications
You must be signed in to change notification settings - Fork 15
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
+ Optimize myblog.py and make it more readable + Create up() to integerate both REST API mode and Password mode + Repair some bugs about failure retries.
- Loading branch information
Showing
5 changed files
with
408 additions
and
355 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
from .json2 import read_json_as_dict | ||
from .md5 import get_file_md5 | ||
from .up import up, md_detect | ||
from .up_password import md_detect | ||
from .up import up | ||
from .wp import wp_xmlrpc |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,277 +1,130 @@ | ||
# -*- coding: utf-8 -*- | ||
# @Time : 2022/12/03 16:42 | ||
# @Author : huangwb8 | ||
# @File : up.py | ||
# @Function: m2w project | ||
# @Software: VSCode | ||
# @Reference: original | ||
|
||
|
||
import m2w.update | ||
import m2w.upload | ||
from m2w.md5 import get_file_md5 | ||
from m2w.json2 import save_dict_as_json | ||
from m2w.json2 import read_json_as_dict | ||
from m2w.rest_api import RestApi | ||
from m2w.up_password import md_detect, up_password | ||
from m2w.wp import wp_xmlrpc | ||
import sys | ||
import os | ||
|
||
|
||
####===============================Functions | ||
def find_files(path, suffix=".md"): | ||
""" | ||
### Description | ||
Find all files with specifed suffix in the path | ||
### Parameters | ||
path: String. The path of files. Allow one or more paths. | ||
suffix: String. The suffix of target files. | ||
### Return | ||
List. Paths of target files. | ||
### Reference | ||
https://www.cnblogs.com/2bjiujiu/p/7255599.html | ||
https://www.cnblogs.com/CGRun/p/16309265.html | ||
""" | ||
# Gather results | ||
result = [] | ||
|
||
# Use a sub function | ||
def ff(path, suffix=".md"): | ||
file_list = os.listdir(path) | ||
for file in file_list: | ||
cur_path = os.path.join(path, file) | ||
# print(cur_path) | ||
if os.path.isdir(cur_path): | ||
ff(cur_path, suffix) | ||
else: | ||
if cur_path.endswith(suffix): | ||
result.append(cur_path) | ||
|
||
# Output data | ||
ff(path, suffix=".md") | ||
return result | ||
|
||
|
||
def md_detect(path_markdown, path_legacy_json, verbose=True): | ||
import shutil | ||
|
||
async def up( | ||
path_markdown, | ||
path_legacy_json, | ||
domain, username, password, application_password, post_metadata, | ||
last_update_time_change = False, force_upload=False, verbose=True, rest_api=True, max_retries = 10 | ||
): | ||
|
||
""" | ||
### Description | ||
Gather paths of brand-new and changed legacy markdown files. | ||
Upload or update markdown files to your WordPress site. | ||
### Parameters | ||
+ path_markdown: String. The path of markdown files. Allow one or more paths. | ||
+ path_legacy_json: String. The path of the 'legacy.json' file. | ||
+ verbose: Boolean. Whether output running messages of the function. | ||
+ path_markdown: The path of markdown files | ||
+ path_legacy_json: The path of legacy*.json | ||
+ domain, username, password, application_password, post_metadata: The data of a WordPress website | ||
+ last_update_time_change: Boolean. Whether to update the last update time of the post. Only work in REST API mode | ||
+ force_upload: Boolean. Whether check the existence of a new post before uploading. Default is False, which means that every new post would receive checking | ||
+ verbose: Boolean. Whether output running messages of the function | ||
+ rest_api: Whether to use REST API mode | ||
+ max_retries: Integer. The max retry time when meeting failure | ||
### Return | ||
Dict. With two keys———"legacy" and "new". | ||
+ The "new" means the brand-new markdown files in the "path_markdown" dir. | ||
+ The "legacy" means the changed legacy markdown files in the "path_markdown" dir. | ||
None | ||
""" | ||
|
||
# Test whether the path_markdown has existed | ||
if len(path_markdown) == 0: | ||
if verbose: | ||
print('No path about markdown files. Please assign at least one.') | ||
sys.exit(0) | ||
# Backup legacy*.json | ||
if os.path.exists(path_legacy_json): | ||
shutil.copyfile(path_legacy_json, path_legacy_json + "_temporary-copy") | ||
|
||
# Test whether the legacy_json has existed | ||
if not os.path.isfile(path_legacy_json): | ||
# Warning | ||
if verbose: | ||
print( | ||
'No legacy json. All markdown files would be treated as brand-new ones.' | ||
) | ||
|
||
# Gather new markdown files | ||
new = [] | ||
for path in path_markdown: | ||
new = new + find_files(path, suffix=".md") | ||
new = sorted( | ||
set(new), key=new.index | ||
) # Keep unique elements. Ref: https://blog.csdn.net/u011361880/article/details/76237096 | ||
|
||
# md5 sum of new files | ||
dict = {} | ||
for i in new: | ||
dict[i] = get_file_md5(i) | ||
save_dict_as_json(dict, path_legacy_json) | ||
if verbose: | ||
print('Create legacy json for new markdowns!') | ||
# Upload & Update | ||
if rest_api: | ||
# REST API Mode | ||
|
||
# Output | ||
result = {"new": new, "legacy": []} # | ||
return result | ||
else: | ||
if verbose: | ||
print('With legacy.json. Confirm new or changed legacy markdown files.') | ||
|
||
# all files | ||
all = [] | ||
for path in path_markdown: | ||
all = all + find_files(path, suffix=".md") | ||
|
||
# Compare changes in markdown files | ||
md5_legacy = read_json_as_dict(path_legacy_json) | ||
md5_all = {} | ||
for i in all: | ||
md5_all[i] = get_file_md5(i) | ||
save_dict_as_json(md5_all, path_legacy_json) # Update legacy json | ||
|
||
# Confirm new files | ||
new = set(md5_all.keys()).difference(set(md5_legacy.keys())) | ||
if len(new) >= 1: | ||
for j in new: | ||
if verbose: | ||
print('New content! ' + j) | ||
md5_legacy[j] = get_file_md5(j) | ||
print("(ฅ´ω`ฅ) REST API Mode. Very safe!") | ||
rest = RestApi( | ||
url=domain, wp_username=username, wp_password=application_password | ||
) | ||
|
||
# Gather paths of brand-new and changed legacy markdown files | ||
res = md_detect(path_markdown, path_legacy_json, verbose=verbose) | ||
md_upload = res["new"] | ||
md_update = res["legacy"] | ||
|
||
if len(md_upload) > 0 or len(md_update) > 0: | ||
# Use REST API mode to upload/update articles | ||
for retry in range(max_retries): | ||
try: | ||
await rest.upload_article( | ||
md_message=res, | ||
post_metadata=post_metadata, | ||
verbose=verbose, | ||
force_upload=force_upload, | ||
last_update=last_update_time_change, | ||
) | ||
if os.path.exists(path_legacy_json + "_temporary-copy"): | ||
os.remove(path_legacy_json + "_temporary-copy") | ||
break | ||
except Exception as e: | ||
print("OOPS, the REST API mode failed!") | ||
if os.path.exists(path_legacy_json + "_temporary-copy"): | ||
os.remove(path_legacy_json) | ||
os.rename( | ||
path_legacy_json + "_temporary-copy", path_legacy_json | ||
) | ||
if retry < max_retries - 1: | ||
print("Retrying...") | ||
continue | ||
else: | ||
print("Maximum retries exceeded. Exiting.") | ||
sys.exit(0) | ||
else: | ||
if verbose: | ||
print('No new markdown files. Ignored.') | ||
# print(new) | ||
# print(list(md5_all.keys())) | ||
# print(list(md5_legacy.keys())) | ||
|
||
# Confirm changed legacy files | ||
md5_filter = md5_all | ||
intersect_key = set(sorted(md5_all.keys())) & set(sorted(md5_legacy.keys())) | ||
for i in intersect_key: | ||
if md5_legacy[i] == md5_all[i]: | ||
md5_filter.pop(i) | ||
else: | ||
if verbose: | ||
print('Content changed!: ', i) | ||
if len(md5_filter) == 0: | ||
if verbose: | ||
print('No changed legacy markdown files. Ignored.') | ||
|
||
# Output | ||
result = {"new": list(new), "legacy": list(md5_filter.keys())} # | ||
return result | ||
|
||
|
||
def up(client, md_upload, md_update, post_metadata, force_upload=False, verbose=True): | ||
""" | ||
### Description | ||
Upload or update markdown files to your WordPress site. | ||
### Parameters | ||
+ client: The return of m2w.wp.wp_xmlrpc. | ||
+ md_upload: String. The path of new markdown files. | ||
+ md_upload: String. The path of changed legacy markdown files. | ||
+ post_metadata: Dict. The metadata of a post. | ||
+ force_upload: Boolean. Whether check the existence of a new post before uploading. Default is False, which means that every new post would receive checking. | ||
+ verbose: Boolean. Whether output running messages of the function. | ||
print("Without any new or changed legacy markdown files. Ignored.") | ||
else: | ||
# Legacy Password Mode | ||
|
||
### Return | ||
None | ||
""" | ||
if verbose: | ||
print("Σ( ° △ °|||)︴Legacy Password Mode. Not safe!") | ||
|
||
# Assistant function for uploading | ||
def upload_one_post( | ||
client, filepath, post_metadata, all_cnt, md_cnt, process_number, verbose | ||
): | ||
post = m2w.upload.make_post(filepath, post_metadata) # Upload the new markdown | ||
process_number2 = process_number + 1 | ||
if post is not None: | ||
m2w.upload.push_post(post, client) | ||
if verbose: | ||
md_cnt2 = md_cnt + 1 # Record an uploading event | ||
print( | ||
'Process number: %d/%d SUCCESS: Push "%s"' | ||
% (process_number2, all_cnt, filepath) | ||
) | ||
else: | ||
failpaths.append(filepath) | ||
if verbose: | ||
print( | ||
'Process number: %d/%d WARNING: Can\'t push "%s" because it\'s not Markdown file.' | ||
% (process_number2, all_cnt, filepath) | ||
) | ||
# Output a tuple | ||
return md_cnt2, process_number2 | ||
# Parameters | ||
client = wp_xmlrpc(domain, username, password) | ||
|
||
# Information about force uploading | ||
if force_upload == False: | ||
if verbose: | ||
print( | ||
"You don't want a force uploading. The existence of the post would be checked." | ||
) | ||
else: | ||
if verbose: | ||
print("You want a force uploading? Great!") | ||
# Gather paths of brand-new and changed legacy markdown files | ||
res = md_detect(path_markdown, path_legacy_json, verbose=verbose) | ||
md_upload = res["new"] | ||
md_update = res["legacy"] | ||
|
||
# Upload new markdown files | ||
if len(md_upload) > 0: | ||
md_cnt = 0 | ||
process_number = 0 | ||
all_cnt = len(md_upload) # Count parameters | ||
failpaths = [] # Store failed uploaded markdown files | ||
for filepath in md_upload: | ||
if force_upload == False: | ||
post_wp = m2w.update.find_post( | ||
filepath, client | ||
) # Test whether this file had been existed in the WordPress site | ||
if post_wp is not None: | ||
if verbose: | ||
print( | ||
'Warning: This post is existed in your WordPress site. Ignore uploading!' | ||
) | ||
else: | ||
if verbose: | ||
print( | ||
'This post is exactly a new one in your WordPress site! Try uploading...' | ||
) | ||
res = upload_one_post( | ||
# Use Password mode to upload/update articles | ||
if len(md_upload) > 0 or len(md_update) > 0: | ||
for retry in range(max_retries): | ||
try: | ||
up_password( | ||
client, | ||
filepath, | ||
md_upload, | ||
md_update, | ||
post_metadata, | ||
all_cnt, | ||
md_cnt, | ||
process_number, | ||
verbose, | ||
force_upload=force_upload, | ||
verbose=verbose, | ||
) | ||
md_cnt = +res[0] | ||
process_number = +res[1] | ||
else: | ||
res = upload_one_post( | ||
client, | ||
filepath, | ||
post_metadata, | ||
all_cnt, | ||
md_cnt, | ||
process_number, | ||
verbose, | ||
) | ||
md_cnt = +res[0] | ||
process_number = +res[1] | ||
|
||
if verbose: | ||
print('SUCCESS: %d files have been pushed to your WordPress.' % md_cnt) | ||
if len(failpaths) > 0: | ||
print( | ||
'WARNING: %d files haven\'t been pushed to your WordPress.' | ||
% len(failpaths) | ||
) | ||
print('\nFailure to push these file paths:') | ||
for failpath in failpaths: | ||
print(failpath) | ||
if os.path.exists(path_legacy_json + "_temporary-copy"): | ||
os.remove(path_legacy_json + "_temporary-copy") | ||
break | ||
except Exception as e: | ||
print("OOPS, the Password mode failed!") | ||
if os.path.exists(path_legacy_json + "_temporary-copy"): | ||
os.remove(path_legacy_json) | ||
os.rename( | ||
path_legacy_json + "_temporary-copy", path_legacy_json | ||
) | ||
if retry < max_retries - 1: | ||
print("Retrying...") | ||
continue | ||
else: | ||
print("Maximum retries exceeded. Exiting.") | ||
sys.exit(0) | ||
|
||
else: | ||
if verbose: | ||
print("Without any new or changed legacy markdown files. Ignored.") | ||
|
||
# Update changed legacy markdown files | ||
if len(md_update) > 0: | ||
for filepath in md_update: | ||
post = m2w.update.find_post(filepath, client) | ||
if post is not None: | ||
ret = m2w.update.update_post_content(post, filepath, client) | ||
if ret: | ||
if verbose: | ||
print('SUCCESS: Update the file "%s"' % filepath) | ||
else: | ||
if verbose: | ||
print('FAILURE: Update the file "%s"' % filepath) | ||
else: | ||
if verbose: | ||
print( | ||
'FAILURE to find the post. Please check your User Configuration and the title in your WordPress.' | ||
) |
Oops, something went wrong.