Skip to content

Commit

Permalink
add: 自动获取OCR密钥
Browse files Browse the repository at this point in the history
  • Loading branch information
CberYellowstone committed Apr 25, 2022
1 parent c6805dc commit f997728
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 121 deletions.
3 changes: 2 additions & 1 deletion GUI.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,9 @@ def updateTranslatorList(self, _list:list):
[self.resultTextEditList[n].setPlaceholderText(eachTranslator) for n, eachTranslator in enumerate(self.TranslatorList)]
_len = _list.__len__()
if _len < 4:
if _len == 0: _len = 1
[each.setVisible(False) for each in self.resultTextEditList[_len-4:]]
n = 80*(4-_list.__len__())
n = 80*(4-_len)
self.setFixedSize(self.defaultWidth, self.defaultHeight-n)
self.move(self.defaultX, self.defaultY+n)
else:
Expand Down
211 changes: 91 additions & 120 deletions OCR.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,137 +12,108 @@
from urllib3 import disable_warnings
disable_warnings()

#http://xf.aka.today/v3/user_info.php?open_id=11111111111111111111111111111111

def getOCRResult(img) -> str:
def cv2ImgToBytes(img):
# 如果直接tobytes写入文件会导致无法打开,需要编码成一种图片文件格式(jpg或png),再tobytes
# 这里得到的bytes 和 with open("","rb") as f: bytes=f.read()的bytes可能不一样,如果用这里得到的bytes保存过一次,下次就f.read()和cv2ImgToBytes(img)会一样
return cv2.imencode('.jpg', img)[1].tobytes()

def cv2ImgToBytes(img):
# 如果直接tobytes写入文件会导致无法打开,需要编码成一种图片文件格式(jpg或png),再tobytes
# 这里得到的bytes 和 with open("","rb") as f: bytes=f.read()的bytes可能不一样,如果用这里得到的bytes保存过一次,下次就f.read()和cv2ImgToBytes(img)会一样
return cv2.imencode('.jpg', img)[1].tobytes()


class AssembleHeaderException(Exception):
def __init__(self, msg):
self.message = msg


class Url:
def __init__(self, host, path, schema):
self.host = host
self.path = path
self.schema = schema


# calculate sha256 and encode to base64
def sha256base64(data) -> str:
sha256 = hashlib.sha256()
sha256.update(data)
return base64.b64encode(sha256.digest()).decode(encoding='utf-8')


def parse_url(requset_url: str):
stidx = requset_url.index("://")
host = requset_url[stidx + 3:]
schema = requset_url[:stidx + 3]
edidx = host.index("/")
if edidx <= 0:
raise AssembleHeaderException(f"invalid request url:{requset_url}")
path = host[edidx:]
host = host[:edidx]
return Url(host, path, schema)


# build websocket auth request url
def assemble_ws_auth_url(requset_url: str, method="POST", api_key="", api_secret=""):
u = parse_url(requset_url)
host = u.host
path = u.path
now = datetime.now()
date = format_date_time(mktime(now.timetuple()))
signature_origin = f"host: {host}\ndate: {date}\n{method} {path} HTTP/1.1"
signature_sha = hmac.new(api_secret.encode('utf-8'), signature_origin.encode('utf-8'), digestmod=hashlib.sha256).digest()
signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')

authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % (api_key, "hmac-sha256", "host date request-line", signature_sha)

authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')
values = {"host": host, "date": date, "authorization": authorization}
return f'{requset_url}?{urlencode(values)}'


def formatJson(JsonRawText: str) -> str:
JsonRawDict = json.loads(JsonRawText)
try:
for eachPages in JsonRawDict['pages']:
for eachLines in eachPages['lines']:
lineWordsStr = str()
for eachWords in eachLines['words']:
lineWordsStr += eachWords['content']
return(lineWordsStr)
except KeyError:
return '无法获取有效内容OvO'

def checkSecretAvailable(appId: str, apiSecret: str, apiKey: str) -> bool:
url = 'https://api.xf-yun.com/v1/private/s00b65163'
_body = {"header": {"app_id": appId, "status": 3}, "parameter": {"s00b65163": {"category": "mix0", "result": {"encoding": "utf8", "compress": "raw", "format": "json"}}}}
request_url = assemble_ws_auth_url(url, "POST", apiKey, apiSecret)
headers = {'content-type': "application/json", 'host': 'api.xf-yun.com', 'app_id': appId}
try:
response = requests.post(request_url, data=json.dumps(_body), headers=headers,verify=False)
return json.loads(response.content.decode())['header']['code'] == 10009
except Exception:
return False

'''
appid、apiSecret、apiKey请到讯飞开放平台控制台获取并填写到此demo中;
图像数据,base64编码后大小不得超过4M
'''
# 请到控制台获取以下信息,并填写
def getOCRResult(img) -> str:
APPId = "c788b7aa"
APISecret = "ODFmODQwZWJmZDhlNTIzOTljNGI3OTcy"
APIKey = "7ec53833f14724cffb810c14e72eef0d"
# 图片位置
# with open("test/maxresdefault.jpg", "rb") as f:
# imageBytes = f.read()
imageBytes = cv2ImgToBytes(img)

class AssembleHeaderException(Exception):
def __init__(self, msg):
self.message = msg


class Url:
def __init__(this, host, path, schema):
this.host = host
this.path = path
this.schema = schema


# calculate sha256 and encode to base64
def sha256base64(data):
sha256 = hashlib.sha256()
sha256.update(data)
return base64.b64encode(sha256.digest()).decode(encoding='utf-8')


def parse_url(requset_url):
stidx = requset_url.index("://")
host = requset_url[stidx + 3:]
schema = requset_url[:stidx + 3]
edidx = host.index("/")
if edidx <= 0:
raise AssembleHeaderException(f"invalid request url:{requset_url}")
path = host[edidx:]
host = host[:edidx]
return Url(host, path, schema)


# build websocket auth request url
def assemble_ws_auth_url(requset_url, method="POST", api_key="", api_secret=""):
u = parse_url(requset_url)
host = u.host
path = u.path
now = datetime.now()
date = format_date_time(mktime(now.timetuple()))
# print(date)
# date = "Thu, 12 Dec 2019 01:57:27 GMT"
signature_origin = "host: {}\ndate: {}\n{} {} HTTP/1.1".format(
host, date, method, path)
# print(signature_origin)
signature_sha = hmac.new(api_secret.encode(
'utf-8'), signature_origin.encode('utf-8'), digestmod=hashlib.sha256).digest()
signature_sha = base64.b64encode(signature_sha).decode(encoding='utf-8')
authorization_origin = "api_key=\"%s\", algorithm=\"%s\", headers=\"%s\", signature=\"%s\"" % (
api_key, "hmac-sha256", "host date request-line", signature_sha)
authorization = base64.b64encode(
authorization_origin.encode('utf-8')).decode(encoding='utf-8')
# print(authorization_origin)
values = {
"host": host,
"date": date,
"authorization": authorization
}

return f'{requset_url}?{urlencode(values)}'


url = 'https://api.xf-yun.com/v1/private/s00b65163'

body = {
"header": {
"app_id": APPId,
"status": 3,
},
"parameter": {
"s00b65163": {
"category": "mix0",
"result": {
"encoding": "utf8",
"compress": "raw",
"format": "json"
}
}
},
"payload": {
"s00b65163_data_1": {
"encoding": "png",
"image": str(base64.b64encode(imageBytes), 'UTF-8'),
"status": 3
}
}
}


def formatJson(JsonRawText):
JsonRawDict = json.loads(JsonRawText)
try:
for eachPages in JsonRawDict['pages']:
for eachLines in eachPages['lines']:
lineWordsStr = str()
for eachWords in eachLines['words']:
lineWordsStr += eachWords['content']
# print(repr(eachWords['content']))
return(lineWordsStr)
except KeyError:
return '无法获取有效内容OvO'


body = {"header": {"app_id": APPId, "status": 3}, "parameter": {"s00b65163": {"category": "mix0", "result": {"encoding": "utf8", "compress": "raw", "format": "json"}}}, "payload": {"s00b65163_data_1": {"encoding": "png", "image": str(base64.b64encode(imageBytes), 'UTF-8'), "status": 3}}}
request_url = assemble_ws_auth_url(url, "POST", APIKey, APISecret)
headers = {'content-type': "application/json", 'host': 'api.xf-yun.com', 'app_id': APPId}
try:
response = requests.post(request_url, data=json.dumps(body), headers=headers,verify=False)
tempResult = json.loads(response.content.decode())
finalResult = base64.b64decode(tempResult['payload']['result']['text']).decode()
finalResult = finalResult.replace(" ", "").replace("\n", "").replace("\t", "").strip()
except Exception as err:
return str(err)
tempResult = json.loads(response.content.decode())
finalResult = base64.b64decode(tempResult['payload']['result']['text']).decode()
finalResult = finalResult.replace(" ", "").replace("\n", "").replace("\t", "").strip()
return f'OCR出错:{err}'

return formatJson(finalResult)

def getOCRSecret() -> tuple:
url = "https://getocrsecret.ystone.workers.dev/"
return tuple((eachDict['appId'], eachDict['apiSecret2'], eachDict['apiKey2']) for eachDict in json.loads(base64.b64decode(requests.get(url).content))['data']['all_share'] if(eachDict['apiKey2'] and eachDict['apiSecret2']))

def getVaildOCRSecert() -> tuple:
tempList = getOCRSecret()
return tuple(each for each in tempList if(checkSecretAvailable(each[0], each[1], each[2])))

0 comments on commit f997728

Please sign in to comment.