Skip to content

Commit

Permalink
Merge pull request #2426 from chaoss/dev
Browse files Browse the repository at this point in the history
Release 0.50.3
  • Loading branch information
sgoggins authored Jun 6, 2023
2 parents 12f0962 + 037d00e commit 5560f4d
Show file tree
Hide file tree
Showing 35 changed files with 897 additions and 389 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -192,4 +192,8 @@ pgdata/
postgres-data/

# Generated files from github
.history/
.history/sendgrid.env
sendgrid.env
*sendgrid*.env
./sendgrid.env
sendgrid.env
2 changes: 1 addition & 1 deletion augur/api/routes/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import pandas as pd
from flask import request, Response, jsonify, session
from flask_login import login_user, logout_user, current_user, login_required
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.security import check_password_hash
from sqlalchemy.sql import text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.exc import NoResultFound
Expand Down
4 changes: 2 additions & 2 deletions augur/api/routes/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import pandas as pd
from flask import request, Response, jsonify, session
from flask_login import login_user, logout_user, current_user, login_required
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.security import check_password_hash
from sqlalchemy.sql import text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm.exc import NoResultFound
Expand Down Expand Up @@ -212,7 +212,7 @@ def update_user():
return jsonify({"status": "Email Updated"})

if new_password is not None:
current_user.login_hashword = generate_password_hash(new_password)
current_user.login_hashword = User.compute_hashsed_password(new_password)
session.commit()
session = Session()
return jsonify({"status": "Password Updated"})
Expand Down
120 changes: 79 additions & 41 deletions augur/api/view/api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from flask import Flask, render_template, render_template_string, request, abort, jsonify, redirect, url_for, session, flash
import re
from flask_login import current_user, login_required
from augur.application.db.models import Repo
from augur.application.db.models import Repo, RepoGroup, UserGroup, UserRepo
from augur.tasks.frontend import add_org_repo_list, parse_org_and_repo_name, parse_org_name
from .utils import *
from ..server import app
from ..server import app, engine
from augur.application.db.session import DatabaseSession

@app.route('/cache/file/')
@app.route('/cache/file/<path:file>')
Expand All @@ -11,6 +14,36 @@ def cache(file=None):
return redirect(url_for('static', filename="cache"))
return redirect(url_for('static', filename="cache/" + toCacheFilename(file, False)))


def add_existing_repo_to_group(session, user_id, group_name, repo_id):

logger.info("Adding existing repo to group")

group_id = UserGroup.convert_group_name_to_id(session, user_id, group_name)
if group_id is None:
return False

result = UserRepo.insert(session, repo_id, group_id)
if not result:
return False

def add_existing_org_to_group(session, user_id, group_name, rg_id):

logger.info("Adding existing org to group")

group_id = UserGroup.convert_group_name_to_id(session, user_id, group_name)
if group_id is None:
return False

repos = session.query(Repo).filter(Repo.repo_group_id == rg_id).all()
logger.info("Length of repos in org: " + str(len(repos)))
for repo in repos:
result = UserRepo.insert(session, repo.repo_id, group_id)
if not result:
logger.info("Failed to add repo to group")



@app.route('/account/repos/add', methods = ['POST'])
@login_required
def av_add_user_repo():
Expand All @@ -33,46 +66,51 @@ def av_add_user_repo():
if group == "None":
group = current_user.login_name + "_default"

invalid_urls = []

with DatabaseSession(logger, engine) as session:
for url in urls:

# matches https://github.com/{org}/ or htts://github.com/{org}
if (org_name := Repo.parse_github_org_url(url)):
rg_obj = RepoGroup.get_by_name(session, org_name)
if rg_obj:
# add the orgs repos to the group
add_existing_org_to_group(session, current_user.user_id, group, rg_obj.repo_group_id)

# matches https://github.com/{org}/{repo}/ or htts://github.com/{org}/{repo}
elif Repo.parse_github_repo_url(url)[0]:
org_name, repo_name = Repo.parse_github_repo_url(url)
repo_git = f"https://github.com/{org_name}/{repo_name}"
repo_obj = Repo.get_by_repo_git(session, repo_git)
if repo_obj:
add_existing_repo_to_group(session, current_user.user_id, group, repo_obj.repo_id)

# matches /{org}/{repo}/ or /{org}/{repo} or {org}/{repo}/ or {org}/{repo}
elif (match := parse_org_and_repo_name(url)):
org, repo = match.groups()
repo_git = f"https://github.com/{org}/{repo}"
repo_obj = Repo.get_by_repo_git(session, repo_git)
if repo_obj:
add_existing_repo_to_group(session, current_user.user_id, group, repo_obj.repo_id)

# matches /{org}/ or /{org} or {org}/ or {org}
elif (match := parse_org_name(url)):
org_name = match.group(1)
rg_obj = RepoGroup.get_by_name(session, org_name)
logger.info(rg_obj)
if rg_obj:
# add the orgs repos to the group
add_existing_org_to_group(session, current_user.user_id, group, rg_obj.repo_group_id)

else:
invalid_urls.append(url)

added_orgs = 0
added_repos = 0
for url in urls:

# matches https://github.com/{org}/ or htts://github.com/{org}
if Repo.parse_github_org_url(url):
added = current_user.add_org(group, url)
if added:
added_orgs += 1

# matches https://github.com/{org}/{repo}/ or htts://github.com/{org}/{repo}
elif Repo.parse_github_repo_url(url)[0]:
print("Adding repo")
added = current_user.add_repo(group, url)
if added:
print("Repo added")
added_repos += 1

# matches /{org}/{repo}/ or /{org}/{repo} or {org}/{repo}/ or {org}/{repo}
elif (match := re.match(r'^\/?([a-zA-Z0-9_-]+)\/([a-zA-Z0-9_-]+)\/?$', url)):
org, repo = match.groups()
repo_url = f"https://github.com/{org}/{repo}/"
added = current_user.add_repo(group, repo_url)
if added:
added_repos += 1

# matches /{org}/ or /{org} or {org}/ or {org}
elif (match := re.match(r'^\/?([a-zA-Z0-9_-]+)\/?$', url)):
org = match.group(1)
org_url = f"https://github.com/{org}/"
added = current_user.add_org(group, org_url)
if added:
added_orgs += 1


if not added_orgs and not added_repos:
flash(f"Unable to add any repos or orgs")
else:
flash(f"Successfully added {added_repos} repos and {added_orgs} orgs")
if urls:
urls = [url.lower() for url in urls]
add_org_repo_list.si(current_user.user_id, group, urls).apply_async()

flash("Adding repos and orgs in the background")

return redirect(url_for("user_settings") + "?section=tracker")

Expand Down
99 changes: 60 additions & 39 deletions augur/application/cli/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def start(disable_collection, development, port):
if not port:
port = config.get_value("Server", "port")

worker_vmem_cap = config.get_value("Celery", 'worker_process_vmem_cap')

gunicorn_command = f"gunicorn -c {gunicorn_location} -b {host}:{port} augur.api.server:app"
server = subprocess.Popen(gunicorn_command.split(" "))
Expand All @@ -83,30 +84,18 @@ def start(disable_collection, development, port):
logger.info('Gunicorn webserver started...')
logger.info(f'Augur is running at: {"http" if development else "https"}://{host}:{port}')

scheduling_worker_process = None
core_worker_process = None
secondary_worker_process = None
celery_beat_process = None
facade_worker_process = None
if not disable_collection:

if os.path.exists("celerybeat-schedule.db"):
processes = start_celery_worker_processes(float(worker_vmem_cap), disable_collection)
time.sleep(5)
if os.path.exists("celerybeat-schedule.db"):
logger.info("Deleting old task schedule")
os.remove("celerybeat-schedule.db")

scheduling_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency=2 -n scheduling:{uuid.uuid4().hex}@%h -Q scheduling"
core_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency=45 -n core:{uuid.uuid4().hex}@%h"
secondary_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency=10 -n secondary:{uuid.uuid4().hex}@%h -Q secondary"
facade_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency=15 -n facade:{uuid.uuid4().hex}@%h -Q facade"

scheduling_worker_process = subprocess.Popen(scheduling_worker.split(" "))
core_worker_process = subprocess.Popen(core_worker.split(" "))
secondary_worker_process = subprocess.Popen(secondary_worker.split(" "))
facade_worker_process = subprocess.Popen(facade_worker.split(" "))
celery_beat_process = None
celery_command = "celery -A augur.tasks.init.celery_app.celery_app beat -l debug"
celery_beat_process = subprocess.Popen(celery_command.split(" "))

time.sleep(5)
if not disable_collection:


with DatabaseSession(logger) as session:

clean_collection_status(session)
Expand All @@ -120,10 +109,6 @@ def start(disable_collection, development, port):

augur_collection_monitor.si().apply_async()


celery_command = "celery -A augur.tasks.init.celery_app.celery_app beat -l debug"
celery_beat_process = subprocess.Popen(celery_command.split(" "))

else:
logger.info("Collection disabled")

Expand All @@ -135,21 +120,10 @@ def start(disable_collection, development, port):
logger.info("Shutting down server")
server.terminate()

if core_worker_process:
logger.info("Shutting down celery process: core")
core_worker_process.terminate()

if scheduling_worker_process:
logger.info("Shutting down celery process: scheduling")
scheduling_worker_process.terminate()

if secondary_worker_process:
logger.info("Shutting down celery process: secondary")
secondary_worker_process.terminate()

if facade_worker_process:
logger.info("Shutting down celery process: facade")
facade_worker_process.terminate()
logger.info("Shutting down all celery worker processes")
for p in processes:
if p:
p.terminate()

if celery_beat_process:
logger.info("Shutting down celery beat process")
Expand All @@ -162,6 +136,54 @@ def start(disable_collection, development, port):
except RedisConnectionError:
pass

def start_celery_worker_processes(vmem_cap_ratio, disable_collection=False):

#Calculate process scaling based on how much memory is available on the system in bytes.
#Each celery process takes ~500MB or 500 * 1024^2 bytes

process_list = []

#Cap memory usage to 30% of total virtual memory
available_memory_in_bytes = psutil.virtual_memory().total * vmem_cap_ratio
available_memory_in_megabytes = available_memory_in_bytes / (1024 ** 2)
max_process_estimate = available_memory_in_megabytes // 500

#Get a subset of the maximum procesess available using a ratio, not exceeding a maximum value
def determine_worker_processes(ratio,maximum):
return max(min(round(max_process_estimate * ratio),maximum),1)

frontend_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency=1 -n frontend:{uuid.uuid4().hex}@%h -Q frontend"
max_process_estimate -= 1
process_list.append(subprocess.Popen(frontend_worker.split(" ")))

if not disable_collection:

#2 processes are always reserved as a baseline.
scheduling_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency=2 -n scheduling:{uuid.uuid4().hex}@%h -Q scheduling"
max_process_estimate -= 2
process_list.append(subprocess.Popen(scheduling_worker.split(" ")))

#60% of estimate, Maximum value of 45
core_num_processes = determine_worker_processes(.6, 45)
logger.info(f"Starting core worker processes with concurrency={core_num_processes}")
core_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency={core_num_processes} -n core:{uuid.uuid4().hex}@%h"
process_list.append(subprocess.Popen(core_worker.split(" ")))

#20% of estimate, Maximum value of 25
secondary_num_processes = determine_worker_processes(.2, 25)
logger.info(f"Starting secondary worker processes with concurrency={secondary_num_processes}")
secondary_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency={secondary_num_processes} -n secondary:{uuid.uuid4().hex}@%h -Q secondary"
process_list.append(subprocess.Popen(secondary_worker.split(" ")))

#15% of estimate, Maximum value of 20
facade_num_processes = determine_worker_processes(.2, 20)
logger.info(f"Starting facade worker processes with concurrency={facade_num_processes}")
facade_worker = f"celery -A augur.tasks.init.celery_app.celery_app worker -l info --concurrency={facade_num_processes} -n facade:{uuid.uuid4().hex}@%h -Q facade"

process_list.append(subprocess.Popen(facade_worker.split(" ")))

return process_list


@cli.command('stop')
def stop():
Expand Down Expand Up @@ -378,7 +400,6 @@ def raise_open_file_limit(num_files):

return


# def initialize_components(augur_app, disable_housekeeper):
# master = None
# manager = None
Expand Down
22 changes: 19 additions & 3 deletions augur/application/cli/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import os
import click
import logging
from werkzeug.security import generate_password_hash
from augur.application.db.models import User
from augur.application.db.engine import DatabaseEngine
from sqlalchemy.orm import sessionmaker
Expand Down Expand Up @@ -48,7 +47,7 @@ def add_user(username, email, firstname, lastname, admin, phone_number, password

user = session.query(User).filter(User.login_name == username).first()
if not user:
password = generate_password_hash(password)
password = User.compute_hashsed_password(password)
new_user = User(login_name=username, login_hashword=password, email=email, text_phone=phone_number, first_name=firstname, last_name=lastname, admin=admin, tool_source="User CLI", tool_version=None, data_source="CLI")
session.add(new_user)
session.commit()
Expand All @@ -59,4 +58,21 @@ def add_user(username, email, firstname, lastname, admin, phone_number, password
session.close()
engine.dispose()

return 0
return 0

@cli.command('password_reset', short_help="Reset a user's password")
@click.argument("username")
@click.password_option(help="New password")
def reset_password(username, password):
session = Session()

user = session.query(User).filter(User.login_name == username).first()

if not user:
return click.echo("invalid username")

password = User.compute_hashsed_password(password)
user.login_hashword = password
session.commit()

return click.echo("Password updated")
4 changes: 3 additions & 1 deletion augur/application/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def get_development_flag():
"github": "<gh_api_key>",
"gitlab": "<gl_api_key>"
},
#TODO: a lot of these are deprecated.
"Facade": {
"check_updates": 1,
"create_xlsx_summary_files": 1,
Expand Down Expand Up @@ -66,7 +67,8 @@ def get_development_flag():
"log_level": "INFO",
},
"Celery": {
"concurrency": 12
"worker_process_vmem_cap": 0.25,
"refresh_materialized_views_interval_in_days": 7
},
"Redis": {
"cache_group": 0,
Expand Down
Loading

0 comments on commit 5560f4d

Please sign in to comment.