Skip to content

Commit

Permalink
Merge pull request #51 from catalyst/catalyst-fix-tests
Browse files Browse the repository at this point in the history
Fixes for import/export and unit tests for local API
  • Loading branch information
ilyatregubov authored Apr 1, 2022
2 parents 3409c01 + 51eb781 commit b0bb3d9
Show file tree
Hide file tree
Showing 19 changed files with 520 additions and 124 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
python-version: 3.7
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python3 -m pip install --upgrade pip
pip install -r requirements.txt
- name: Lint with flake8
run: |
Expand All @@ -27,4 +27,4 @@ jobs:
- name: Test with pytest
run: |
pip install pytest
PYTHONPATH=. pytest
PYTHONPATH=. MOODLE_MLBACKEND_RUN_SLOW_TESTS=1 pytest
27 changes: 17 additions & 10 deletions moodlemlbackend/model/tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,23 @@ class TF(object):
"""Tensorflow classifier"""

def __init__(self, n_features, n_classes, n_epoch, batch_size,
tensor_logdir, initial_weights=False):
tensor_logdir, size_hint):

self.n_epoch = n_epoch
self.batch_size = batch_size
self.n_features = n_features

# Based on the number of features although we need a reasonable
# minimum.
self.n_hidden = max(4, int(n_features / 3))
self.n_hidden_layers = 2
if size_hint > 10:
self.n_hidden = max(8, int(n_features / 3))
self.n_hidden_layers = 2
else:
self.n_hidden = 10
self.n_hidden_layers = 1

self.n_classes = n_classes
self.tensor_logdir = tensor_logdir

self.build_graph(initial_weights)
self.build_graph()

def __getstate__(self):
state = self.__dict__.copy()
Expand All @@ -50,7 +53,7 @@ def set_tensor_logdir(self, tensor_logdir):

self.tensor_logdir = tensor_logdir

def build_graph(self, initial_weights=False):
def build_graph(self):
"""Builds the computational graph without feeding any data in"""
tf.compat.v1.reset_default_graph()
tf.keras.backend.clear_session()
Expand Down Expand Up @@ -84,7 +87,7 @@ def load(self, path):
self.model = tf.keras.models.load_model(path)
self.compile()

def fit(self, X, y, log_run=True):
def fit(self, X, y, log_run=True, debug=False):
"""Fit the model to the provided data"""
y = preprocessing.MultiLabelBinarizer().fit_transform(
y.reshape(len(y), 1))
Expand All @@ -101,11 +104,15 @@ def fit(self, X, y, log_run=True):
)
kwargs['callbacks'] = [cb]

if debug:
kwargs['verbose'] = 2
kwargs['validation_split'] = 0.05
else:
kwargs['verbose'] = 0

history = self.model.fit(X, y,
self.batch_size,
self.n_epoch,
verbose=2,
validation_split=0.1, # XXX
**kwargs
)

Expand Down
110 changes: 56 additions & 54 deletions moodlemlbackend/processor/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
EXPORT_MODEL_FILENAME = 'model.json'

TARGET_BATCH_SIZE = 1000
DEBUG_MODE = False


class Estimator(object):
Expand Down Expand Up @@ -100,7 +101,22 @@ def load_classifier(self, model_dir=False):
model_dir, PERSIST_FILENAME)

classifier = joblib.load(classifier_filepath)
self.variable_columns = getattr(classifier, 'variable_columns', None)
for attr in ('variable_columns',
'n_classes',
'n_features'):
val = getattr(classifier, attr, None)
old = getattr(self, attr, None)
if old is not None:
_val = val
# we need to do a dance to compare numpy arrays
if isinstance(val, np.ndarray):
_val = list(val)
old = list(old)
if _val != old:
logging.warning(f"loaded classifier has {attr} {_val}, "
f"existing value is {old}. This is bad.")
setattr(self, attr, val)

path = os.path.join(model_dir, 'model.ckpt')
classifier.load(path)

Expand Down Expand Up @@ -219,42 +235,51 @@ def __init__(self, modelid, directory, dataset=None):
self.tensor_logdir = self.get_tensor_logdir()
os.makedirs(self.tensor_logdir, exist_ok=True)

def get_classifier(self, X, y, initial_weights=False):
def get_classifier(self, X, y):
"""Gets the classifier"""

try:
n_rows = X.shape[0]
except AttributeError:
# No X during model import.
# n_rows value does not really matter during import.
n_rows = 1
n_rows, n_features = X.shape

# size_hint helps decide how many hidden nodes the network
# should have. Currently it is mostly based on the number of
# unique examples: the more you have, the larger the network
# *can* be.
#
# We also mix in the number of features: more features
# probably wants a bigger network, but if we don't have the
# examples we can't justify it.
if n_features < 1:
size_hint = 0
else:
size_hint = len(np.unique(X, axis=0)) * (1 + math.log(n_features))

n_batches = (n_rows + TARGET_BATCH_SIZE - 1) // TARGET_BATCH_SIZE
n_batches = min(n_batches, 10)
batch_size = (n_rows + n_batches - 1) // n_batches

# the number of epochs can be smaller if we have a large
# number of samples. On the other hand it must also be small
# if we have very few samples, or the model will overfit. What
# we can say is that with larger batches we need more epochs.
n_epoch = 40 + batch_size // 20
# number of samples. On the other hand it *should* also be
# small if we have *very* few samples, or the model will
# overfit. Unfortunately, we would rather overfit than give
# vague answers.
n_epoch = 80 + (1000000 // (n_rows + 1000))

n_classes = self.n_classes
n_features = X.shape[1]

return tensor.TF(n_features, n_classes, n_epoch, batch_size,
self.get_tensor_logdir(),
initial_weights=initial_weights)
size_hint=size_hint)

def train(self, X_train, y_train, classifier=False, log_run=True):
"""Train the classifier with the provided training data"""

if classifier is False:
# Init the classifier.
classifier = self.get_classifier(X_train, y_train)

# Fit the training set. y should be an array-like.
classifier.fit(X_train, y_train[:, 0], log_run=log_run)
classifier.fit(X_train, y_train[:, 0],
log_run=log_run,
debug=DEBUG_MODE)
self.store_classifier(classifier)
# Returns the trained classifier.
return classifier
Expand Down Expand Up @@ -399,8 +424,6 @@ def evaluate_dataset(self, filepath, min_score=0.6,
# Return results.
result = self.get_evaluation_results(min_score, accepted_deviation)

print("score: " + str(result['score']))

# Add the run id to identify it in the caller.
result['runid'] = int(self.get_runid())

Expand Down Expand Up @@ -576,48 +599,27 @@ def load_classifier(self, model_dir=False):

return classifier

def export_classifier(self, exporttmpdir):
if self.classifier_exists():
def export_classifier(self, exportdir):
"""Save the classifier to an external path"""
try:
classifier = self.load_classifier()
else:
except FileNotFoundError as e:
logging.warning(f"could not export: {e}")
return False

export_vars = {}

# Get all the variables in in initialise-vars scope.
sess = classifier.get_session()
for var in tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES,
scope='initialise-vars'):
# Converting to list as numpy arrays can't be serialised.
export_vars[var.op.name] = var.eval(sess).tolist()

# Append the number of features.
export_vars['n_features'] = classifier.n_features
export_vars['n_classes'] = classifier.n_classes

vars_file_path = os.path.join(exporttmpdir, EXPORT_MODEL_FILENAME)
with open(vars_file_path, 'w') as vars_file:
json.dump(export_vars, vars_file)

return exporttmpdir
path = os.path.join(exportdir, 'model.ckpt')
classifier.save(path)
pickle_name = os.path.join(exportdir, PERSIST_FILENAME)
joblib.dump(classifier, pickle_name)
return exportdir

def import_classifier(self, importdir):
"""Load a previously exported classifier, storing it as this model id.
model_vars_filepath = os.path.join(importdir,
EXPORT_MODEL_FILENAME)

with open(model_vars_filepath) as vars_file:
import_vars = json.load(vars_file)

self.n_features = import_vars['n_features']
if "n_classes" in import_vars:
self.n_classes = import_vars['n_classes']
else:
self.n_classes = 2

classifier = self.get_classifier(False, False,
initial_weights=import_vars)

Note: this overwrites any classifier that already exists with
the same model id.
"""
classifier = self.load_classifier(importdir)
self.store_classifier(classifier)

def classifier_exists(self):
Expand Down
12 changes: 6 additions & 6 deletions moodlemlbackend/webapp/localfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,16 @@ def __init__(self, storage, fetch_model, push_model):

if "MOODLE_MLBACKEND_PYTHON_DIR" not in os.environ:
raise IOError(
'Set env MOODLE_MLBACKEND_PYTHON_DIR to an existing dir')
'Set the MOODLE_MLBACKEND_PYTHON_DIR environment variable '
'to a writeable MLBackend working directory.')

localbasedir = os.environ["MOODLE_MLBACKEND_PYTHON_DIR"]

if not os.path.exists(localbasedir):
if not (os.path.isdir(localbasedir) and os.access(localbasedir, os.W_OK)):
raise IOError(
'The base dir does not exist. ' +
'Set env MOODLE_MLBACKEND_PYTHON_DIR to an existing dir')

os.access(localbasedir, os.W_OK)
f'{localbasedir} is not a writeable directory.\n' +
'Set the MOODLE_MLBACKEND_PYTHON_DIR environment variable '
'to the MLBackend working directory.')

storage.set_localbasedir(localbasedir)

Expand Down
20 changes: 9 additions & 11 deletions moodlemlbackend/webapp/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
import os
import zipfile
import tempfile
import shutil
import atexit
from contextlib import contextmanager

from flask import request

Expand All @@ -23,17 +22,16 @@ def get_request_value(key, pattern=False, exception=True):
return re.sub(pattern, '', value)


@contextmanager
def get_file_path(localbasedir, filekey):

file = request.files[filekey]

tempdir = tempfile.mkdtemp()
tempfilepath = os.path.join(tempdir, filekey)

atexit.register(shutil.rmtree, tempdir)
tempdir = tempfile.TemporaryDirectory()
tempfilepath = os.path.join(tempdir.name, filekey)
file.save(tempfilepath)

return tempfilepath
try:
yield tempfilepath
finally:
tempdir.cleanup()


def zipdir(dirpath, zipf):
Expand All @@ -43,6 +41,6 @@ def zipdir(dirpath, zipf):
for root, dirs, files in os.walk(dirpath):
for file in files:
abspath = os.path.join(root, file)
ziph.write(abspath, os.path.relpath(abspath, root))
ziph.write(abspath, os.path.relpath(abspath, dirpath))
ziph.close()
return ziph
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
tensorflow>=2.4.2,<2.5
tensorflow>=2.4.2
sklearn
numpy>=1.19.2,<1.20
matplotlib>=3.0,<3.4
boto3>=1.9.0,<1.10
flask>=1.0.2,<2.0.0
pytest-flask
joblib>=0.13.0,<0.14
joblib>=0.13.0
markupsafe<2.1.0
36 changes: 29 additions & 7 deletions test/decode-request
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
#!/usr/bin/python3
import pickle, sys
import sys
from pprint import pprint
from collections import Counter
from io import BytesIO
from collections import Counter, defaultdict
import argparse
import json

import numpy as np

from stash import read_pickle, get_boundary, split_body

parser = argparse.ArgumentParser(description='Anonymise a stashed request')
parser = argparse.ArgumentParser(description='Examine a stashed request')
parser.add_argument('-C', '--column-names', action='store_true',
help='show column names')
parser.add_argument('-H', '--http-headers', action='store_true',
Expand All @@ -19,10 +18,12 @@ parser.add_argument('-R', '--data-rows', const=2, type=int, nargs='?',
help='show n rows of data')
parser.add_argument('-V', '--column-variance', action='store_true',
help='show column variance')
parser.add_argument('-S', '--row-statistics', action='store_true',
help='count unique and contradictory rows')
parser.add_argument('-J', '--to-json', action='store_true',
help='output the pickle as json and exit')
parser.add_argument('file',
help='file to anonymize')
help='file to decode')
args = parser.parse_args()

a = read_pickle(args.file)
Expand Down Expand Up @@ -111,10 +112,31 @@ for k, v in parts.items():
print()
print(np.var(samples, 0))
print()
print('index of columns with no-zero variance')
print('index of columns with non-zero variance')
print(variable_samples)


if args.row_statistics:
print()
print(f'\033[01;33mrow statistics\033[00m')
print()
rc = Counter(str(x) for x in samples)
print("most common rows")
for k, v in rc.most_common(10):
print(f'{v:4} {k[:80]}')

answers = defaultdict(list)
for x in samples:
answers[str(x[:-1])].append(x[-1])
contradictions = {}
for k, v in answers.items():
if len(set(v)) != 1:
contradictions[k] = Counter(v)
if contradictions:
print("contradictory answers:")
for k, v in contradictions.items():
print(f"{k[:100]}:")
for kk, vv in v.most_common():
print(f' {kk} × {vv}')

else:
print(body)
Expand Down
Loading

0 comments on commit b0bb3d9

Please sign in to comment.