diff --git a/src/server/api/user_api.py b/src/server/api/user_api.py index 7910693e..c6212cc7 100644 --- a/src/server/api/user_api.py +++ b/src/server/api/user_api.py @@ -33,6 +33,32 @@ def log_user_action(user, event_class, detail): except Exception as e: print(e) +def password_is_strong(password): + """ Check plain-text password against strength rules.""" + + def has_digit(test_string): + """Test if any character is a digit.""" + for c in test_string: + if c.isdigit(): + return True + return False + + def has_alpha(test_string): + """Test if any character is alphabetic.""" + for c in test_string: + if c.isalpha(): + return True + return False + + if (len(password) > 11 + # and has_alpha(password) + # and has_digit(password) + ): + return True + + else: + return False + def hash_password(password): """ Generate salt+hash for storing in db""" @@ -191,6 +217,7 @@ def user_refresh(): if is_active[0].lower() == 'y': # In the user DB and still Active? token = jwt_ops.create_token(user_name,old_jwt['role']) + log_user_action(user_name, "Success", "Refreshed token") return token else: @@ -208,7 +235,7 @@ def user_create(): Requires admin role - Form POST Parameters + Form POST JSON Parameters ---------- username : str full_name : str @@ -222,12 +249,18 @@ def user_create(): Duplicate user: 409 + DB error """ - new_user = request.form["username"] - fullname = request.form["full_name"] - userpw = request.form["password"] - user_role = request.form["role"] - requesting_user = jwt_ops.get_jwt_user() + try: + post_dict = json.loads(request.data) + new_user = post_dict["username"] + fullname = post_dict["full_name"] + userpw = post_dict["password"] + user_role = post_dict["role"] + except: + return jsonify("Missing one or more parameters"), 400 + + + requesting_user = jwt_ops.validate_decode_jwt()['sub'] pw_hash = hash_password(userpw) @@ -287,21 +320,93 @@ def get_user_count(): return jsonify(user_count[0]) -# TODO: A single do-all update_user() -@user_api.route("/api/admin/user/deactivate", methods=["POST"]) +@user_api.route("/api/admin/user/check_name", methods=["POST"]) @jwt_ops.admin_required -def user_deactivate(): - """Mark user as inactive in DB""" - # TODO - return "", 200 +def check_username(): + """Return 1 if username exists already, else 0.""" + try: + post_dict = json.loads(request.data) + test_username = post_dict["username"] + except: + return jsonify("Missing username"), 400 -@user_api.route("/api/admin/user/activate", methods=["POST"]) -@jwt_ops.admin_required -def user_activate(): - """Mark user as active in DB""" - # TODO - return "", 200 + with engine.connect() as connection: + + s = text( """select count(username) from pdp_users where username=:u """ ) + s = s.bindparams(u=test_username) + result = connection.execute(s) + + if result.rowcount: # As we're doing a count() we *should* get a result + user_exists = result.fetchone()[0] + else: + log_user_action(test_username, "Failure", "Error when checking username") + return jsonify("Error checking username"), 500 + + return jsonify(user_exists) + +@user_api.route("/api/admin/user/update", methods=["POST"]) +@jwt_ops.admin_required +def user_update(): + """Update existing user record + """ + + post_dict = json.loads(request.data) + + try: + username = post_dict["username"] + except: + return jsonify("Must specify username"), 400 + + update_dict = {} + + # Need to be a bit defensive here & select what we want instead of taking what we're given + for key in ["full_name", "active", "role", "password"]: + try: + val = post_dict[key] + update_dict[key] = val + except: + pass + + + if not update_dict: + return jsonify("No changed items specified") # If nothing to do, declare victory + + if "password" in update_dict.keys(): + + if password_is_strong(update_dict['password']): + update_dict['password'] = hash_password(update_dict['password']) + else: + return jsonify("Password too weak") + + + + # We have a variable number of columns to update. + # We could generate a text query on the fly, but this seems the perfect place to use the ORM + # and let it handle the update for us. + + from sqlalchemy import update + from sqlalchemy.orm import Session, sessionmaker + + Session = sessionmaker(engine) + + session = Session() + # #TODO: Figure out why context manager doesn't work or do try/finally + + PU = Table("pdp_users", metadata, autoload=True, autoload_with=engine) + # pr = Table("pdp_user_roles", metadata, autoload=True, autoload_with=engine) + + #TODO: Check tendered role or join roles table for update + + stmt = update(PU).where(PU.columns.username == username).values(update_dict).\ + execution_options(synchronize_session="fetch") + + result = session.execute(stmt) + + session.commit() + session.close() + + return jsonify("Updated") @user_api.route("/api/admin/user/get_users", methods=["GET"]) @@ -309,9 +414,6 @@ def user_activate(): def user_get_list(): """Return list of users""" - # pu = Table("pdp_users", metadata, autoload=True, autoload_with=engine) - # pr = Table("pdp_user_roles", metadata, autoload=True, autoload_with=engine) - with engine.connect() as connection: s = text( @@ -331,3 +433,27 @@ def user_get_list(): return jsonify(ul), 200 +@user_api.route("/api/admin/user/get_info/", methods=["GET"]) +@jwt_ops.admin_required +def user_get_info(username): + """Return info on a specified user""" + + with engine.connect() as connection: + + s = text( + """ select username, full_name, active, pr.role + from pdp_users as pu + left join pdp_user_roles as pr on pu.role = pr._id + where username=:u + """ + ) + s = s.bindparams(u=username) + result = connection.execute(s) + + if result.rowcount: + user_row = result.fetchone() + else: + log_user_action(username, "Failure", "Error when getting user info") + return jsonify("Username not found"), 400 + + return jsonify( dict(zip(result.keys(), user_row)) ), 200 \ No newline at end of file diff --git a/src/server/test_api.py b/src/server/test_api.py index 4a3c9bf9..1de33117 100644 --- a/src/server/test_api.py +++ b/src/server/test_api.py @@ -170,37 +170,35 @@ def test_admingetusers(state: State): userlist = response.json() assert len(userlist) > 1 -# Endpoints not implemented yet - -# def test_check_usernames(state: State): -# """Verify logged-in base_admin can test usernames, gets correct result - existing user """ -# # Build auth string value including token from state -# b_string = 'Bearer ' + state.state['base_admin'] +def test_check_usernames(state: State): + """Verify logged-in base_admin can test usernames, gets correct result - existing user """ + # Build auth string value including token from state + b_string = 'Bearer ' + state.state['base_admin'] -# assert len(b_string) > 24 + assert len(b_string) > 24 -# auth_hdr = {'Authorization' : b_string} + auth_hdr = {'Authorization' : b_string} -# data = {"username":"base_admin"} -# response = requests.post(SERVER_URL + "/api/admin/user/check_name", headers=auth_hdr, json=data) -# assert response.status_code == 200 + data = {"username":"base_admin"} + response = requests.post(SERVER_URL + "/api/admin/user/check_name", headers=auth_hdr, json=data) + assert response.status_code == 200 -# is_user = response.json() -# assert is_user == 1 + is_user = response.json() + assert is_user == 1 -# def test_check_badusernames(state: State): -# """Verify logged-in base_admin can test usernames, gets correct result - nonexistant user """ -# # Build auth string value including token from state -# b_string = 'Bearer ' + state.state['base_admin'] -# assert len(b_string) > 24 -# auth_hdr = {'Authorization' : b_string} +def test_check_badusernames(state: State): + """Verify logged-in base_admin can test usernames, gets correct result - nonexistant user """ + # Build auth string value including token from state + b_string = 'Bearer ' + state.state['base_admin'] + assert len(b_string) > 24 + auth_hdr = {'Authorization' : b_string} -# data = {"username":"got_no_username_like_this"} -# response = requests.post(SERVER_URL + "/api/admin/user/check_name", headers=auth_hdr, json=data) -# assert response.status_code == 200 + data = {"username":"got_no_username_like_this"} + response = requests.post(SERVER_URL + "/api/admin/user/check_name", headers=auth_hdr, json=data) + assert response.status_code == 200 -# is_user = response.json() -# assert is_user == 0 + is_user = response.json() + assert is_user == 0 def test_usergetusers(state: State): diff --git a/src/server/user_mgmt/base_users.py b/src/server/user_mgmt/base_users.py index a35bd4b3..29629d51 100644 --- a/src/server/user_mgmt/base_users.py +++ b/src/server/user_mgmt/base_users.py @@ -58,28 +58,28 @@ def create_base_users(): # TODO: Just call create_user for each # user pw_hash = user_api.hash_password(BASEUSER_PW) ins_stmt = pu.insert().values( - username="base_user", password=pw_hash, active="Y", role=1, + username="base_user", full_name="Base User", password=pw_hash, active="Y", role=1, ) connection.execute(ins_stmt) # INactive user # Reuse pw hash ins_stmt = pu.insert().values( - username="base_user_inact", password=pw_hash, active="N", role=1, + username="base_user_inact", full_name="Inactive User", password=pw_hash, active="N", role=1, ) connection.execute(ins_stmt) # editor pw_hash = user_api.hash_password(BASEEDITOR_PW) ins_stmt = pu.insert().values( - username="base_editor", password=pw_hash, active="Y", role=2, + username="base_editor", full_name="Base Editor", password=pw_hash, active="Y", role=2, ) connection.execute(ins_stmt) # admin pw_hash = user_api.hash_password(BASEADMIN_PW) ins_stmt = pu.insert().values( - username="base_admin", password=pw_hash, active="Y", role=9, + username="base_admin", full_name="Base Admin", password=pw_hash, active="Y", role=9, ) connection.execute(ins_stmt)