-
Notifications
You must be signed in to change notification settings - Fork 9
/
jira_oauth_token_generator.py
201 lines (163 loc) · 7.84 KB
/
jira_oauth_token_generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
'''
* Original implementation is available here: https://bitbucket.org/atlassian_tutorial/atlassian-oauth-examples under python/app.py
* Copied here as jira_oauth_token_generator.py with modifications:
* Since we are not able to resolve SSL Certification problem, let's disable ssl certificate validation for each REST api call.
client.disable_ssl_certificate_validation = True
* Strangely first time (before you approve request in browser) when you access data_url, browser returns 200 response with content of zero bytes instead of 401 response as original code says. Hence I've commented out response code validation for this part.
* Also I've refactored and removed SignatureMethod_RSA_SHA1 into it's own file. This way we can import it into any other python program!
* Python 3 compatible!
* Pre-Requisities:
* You have generated RSA Private and Public keys and stored them into files "oauth.pem" and "oauth.pub" respectively.
* config/ directory contain "starter_oauth.config" file with following details:
jira_base_url=<JIRA Application URL>
consumer_key=<enter as registered by your Jira Admin during Application Link creation>
*
'''
import base64
import os
from pathlib import Path
import json
from urllib import parse
import oauth2 as oauth
from tlslite.utils import keyfactory
import argparse
from configparser import ConfigParser
class SignatureMethod_RSA_SHA1(oauth.SignatureMethod):
name = 'RSA-SHA1'
def signing_base(self, request, consumer, token):
if not hasattr(request,
'normalized_url') or request.normalized_url is None:
raise ValueError("Base URL for request is not set.")
sig = (
oauth.escape(request.method),
oauth.escape(request.normalized_url),
oauth.escape(request.get_normalized_parameters()),
)
key = '%s&' % oauth.escape(consumer.secret)
if token:
key += oauth.escape(token.secret)
raw = '&'.join(sig)
return key, raw
def sign(self, request, consumer, token):
"""Builds the base signature string."""
key, raw = self.signing_base(request, consumer, token)
# Load RSA Private Key
with open( Path.home() / '.oauthconfig/oauth.pem', 'r') as f:
data = f.read()
privateKeyString = data.strip()
privatekey = keyfactory.parsePrivateKey(privateKeyString)
'''
We were getting errors on encoding, so added explicitly "utf8" encoding in rsakey.py
In function hashAndSign(...),
*changed line* -> hashBytes = SHA1(bytearray(bytes))
to -> hashBytes = SHA1(bytearray(bytes, "utf8"))
'''
signature = privatekey.hashAndSign(raw)
return base64.b64encode(signature)
def get_jira_oauth_init_parameters():
# Assumption "starter_auth.config" file is stored in ~/.oauthconfig/ directory
starter_oauth_config_file = Path.home() / ".oauthconfig/starter_oauth.config"
config = ConfigParser()
config.optionxform=str # Read config file as case insensitive
config.read(starter_oauth_config_file)
jira_url = config.get("oauth_config", "jira_base_url")
consumer_key = config.get("oauth_config", "consumer_key")
test_jira_issue = config.get("oauth_config","test_jira_issue")
rsa_public_key = None
with open ( Path.home() / '.oauthconfig/oauth.pub', 'r') as key_cert_file:
rsa_public_key = key_cert_file.read()
return {
"consumer_key" : consumer_key,
"jira_base_url" : jira_url,
"rsa_public_key": rsa_public_key,
"test_jira_issue": test_jira_issue
}
init_dict = get_jira_oauth_init_parameters()
consumer_key = init_dict["consumer_key"]
consumer_secret = init_dict["rsa_public_key"]
test_jira_issue = init_dict["test_jira_issue"]
base_url = init_dict["jira_base_url"]
request_token_url = base_url + '/plugins/servlet/oauth/request-token'
access_token_url = base_url + '/plugins/servlet/oauth/access-token'
authorize_url = base_url + '/plugins/servlet/oauth/authorize'
data_url = base_url + f'/rest/api/2/issue/{test_jira_issue}?fields=summary'
consumer = oauth.Consumer(consumer_key, consumer_secret)
client = oauth.Client(consumer)
client.disable_ssl_certificate_validation = True
# Lets try to reterive mentioned Jira issue
resp, content = client.request(data_url, "GET")
'''
# As per original code, we should get 401, but browser returns 200.
if resp['status'] != '401':
raise Exception("Should have no access!")
'''
consumer = oauth.Consumer(consumer_key, consumer_secret)
client = oauth.Client(consumer)
client.set_signature_method(SignatureMethod_RSA_SHA1())
# Step 1: Get a request token. This is a temporary token that is used for
# having the user authorize an access token and to sign the request to obtain
# said access token.
resp, content = client.request(request_token_url, "POST")
if resp['status'] != '200':
raise Exception("Invalid response %s: %s" % (resp['status'], content))
# If output is in bytes. Let's convert it into String.
if type(content) == bytes:
content = content.decode('UTF-8')
request_token = dict(parse.parse_qsl(content))
print ("Request Token:")
print (f" - oauth_token = {request_token['oauth_token']}")
print (" - oauth_token_secret = %s" % request_token['oauth_token_secret'])
print ("")
# Step 2: Redirect to the provider. Since this is a CLI script we do not
# redirect. In a web application you would redirect the user to the URL
# below.
print ("Go to the following link in your browser:")
print ("%s?oauth_token=%s" % (authorize_url, request_token['oauth_token']))
print
# After the user has granted access to you, the consumer, the provider will
# redirect you to whatever URL you have told them to redirect to. You can
# usually define this in the oauth_callback argument as well.
accepted = 'n'
while accepted.lower() == 'n':
accepted = input('Have you authorized me? (y/n) ')
# oauth_verifier = raw_input('What is the PIN? ')
# Step 3: Once the consumer has redirected the user back to the oauth_callback
# URL you can request the access token the user has approved. You use the
# request token to sign this request. After this is done you throw away the
# request token and use the access token returned. You should store this
# access token somewhere safe, like a database, for future use.
token = oauth.Token(request_token['oauth_token'],
request_token['oauth_token_secret'])
# token.set_verifier(oauth_verifier)
client = oauth.Client(consumer, token)
client.set_signature_method(SignatureMethod_RSA_SHA1())
resp, content = client.request(access_token_url, "POST")
# Reponse is coming in bytes. Let's convert it into String.
# If output is in bytes. Let's convert it into String.
if type(content) == bytes:
content = content.decode('UTF-8')
access_token = dict(parse.parse_qsl(content))
print ("Access Token:")
print (" - oauth_token = %s" % access_token['oauth_token'])
print (" - oauth_token_secret = %s" % access_token['oauth_token_secret'])
print ("")
print ("You may now access protected resources using the access tokens above.")
print ("")
print("")
print(f"Accessing {test_jira_issue} using generated OAuth tokens:")
print("")
# Now lets try to access the same issue again with the access token. We should get a 200!
accessToken = oauth.Token(access_token['oauth_token'],
access_token['oauth_token_secret'])
client = oauth.Client(consumer, accessToken)
client.set_signature_method(SignatureMethod_RSA_SHA1())
resp, content = client.request(data_url, "GET")
if resp['status'] != '200':
raise Exception("Should have access!")
print("Success!")
# If output is in bytes. Let's convert it into String.
if type(content) == bytes:
content = content.decode('UTF-8')
json_content = json.loads(content)
print(f'Issue key: {json_content["key"]}, Summary: {json_content["fields"]["summary"]} ')
print("")