Skip to content
This repository has been archived by the owner on Aug 30, 2024. It is now read-only.

Commit

Permalink
bunq update #13 related API changes
Browse files Browse the repository at this point in the history
  • Loading branch information
woudt committed Jan 29, 2020
1 parent ed27527 commit bacf43e
Showing 1 changed file with 21 additions and 22 deletions.
43 changes: 21 additions & 22 deletions app/bunq.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import requests

from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, hmac, serialization
from cryptography.hazmat.primitives.asymmetric import padding, rsa
Expand Down Expand Up @@ -356,10 +357,6 @@ def request(method, endpoint, config, data=None, extra_headers=None):
headers = {
'Cache-Control': 'no-cache',
'User-Agent': NAME,
'X-Bunq-Client-Request-Id': '0',
'X-Bunq-Geolocation': '0 0 0 0 000',
'X-Bunq-Language': 'en_US',
'X-Bunq-Region': 'en_US',
}
if extra_headers is not None:
for extra in extra_headers:
Expand All @@ -368,7 +365,7 @@ def request(method, endpoint, config, data=None, extra_headers=None):
headers['X-Bunq-Client-Authentication'] = get_install_token(config)
elif endpoint != "v1/installation":
headers['X-Bunq-Client-Authentication'] = get_session_token(config)
sign(method, endpoint, config, headers, data)
sign(endpoint, config, headers, data)
if method == "GET":
reply = requests.get(BUNQAPI + endpoint, headers=headers)
elif method == "POST":
Expand All @@ -387,18 +384,11 @@ def request(method, endpoint, config, data=None, extra_headers=None):
return reply.json()
return reply.text

def sign(method, endpoint, config, headers, data):
def sign(endpoint, config, headers, data):
""" Sign the message before sending """
if endpoint == "v1/installation":
return # Installation call is not signed
message = method + " /" + endpoint + "\n"
for name in sorted(headers.keys()):
message += name + ": " + headers[name] + "\n"
message += "\n"
if isinstance(data, bytes):
message = message.encode("ascii") + data
else:
message = (message + data).encode("ascii")
message = data.encode("ascii")
key = get_private_key(config)
sig = key.sign(message, padding.PKCS1v15(), hashes.SHA256())
sig_str = base64.b64encode(sig).decode("ascii")
Expand All @@ -413,13 +403,22 @@ def verify(endpoint, config, status_code, headers, text):
if "Error" in result:
print(result)
return # Errors are not signed
message = str(status_code) + "\n"
for name in sorted(headers.keys()):
if name.startswith("X-Bunq-") and name != "X-Bunq-Server-Signature":
message += name + ": " + headers[name] + "\n"
message += "\n" + text

sig = base64.b64decode(headers["X-Bunq-Server-Signature"])
key = get_server_key(config)
# Will throw an exception on failure
key.verify(sig, message.encode("ascii"),
padding.PKCS1v15(), hashes.SHA256())
try: # try new body signing first
key.verify(sig, text.encode("ascii"),
padding.PKCS1v15(), hashes.SHA256())

except InvalidSignature: # fall back to old signing
print("Fallback to old signature verification method")
message = str(status_code) + "\n"
for name in sorted(headers.keys()):
if name[:7] == "X-Bunq-" and name != "X-Bunq-Server-Signature":
message += name + ": " + headers[name] + "\n"
message += "\n" + text
try:
key.verify(sig, message.encode("ascii"),
padding.PKCS1v15(), hashes.SHA256())
except InvalidSignature:
print("WARNING: signature verification failed!")

0 comments on commit bacf43e

Please sign in to comment.