Skip to content

Commit

Permalink
Merge pull request #678 from DiamondLightSource/profiler_upg
Browse files Browse the repository at this point in the history
Profiler upgrade
  • Loading branch information
dkazanc authored Jul 3, 2020
2 parents 7fa7a7d + 0bc30cb commit 483916a
Show file tree
Hide file tree
Showing 11 changed files with 324 additions and 207 deletions.
36 changes: 21 additions & 15 deletions savu/core/plugin_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
"""

import logging
import numpy as np

import savu.core.utils as cu
import savu.plugins.utils as pu
Expand All @@ -46,9 +45,9 @@ def __init__(self, options, name='PluginRunner'):

def _run_plugin_list(self):
""" Create an experiment and run the plugin list.
"""
"""
self.exp._setup(self)

plugin_list = self.exp.meta_data.plugin_list
logging.info('Running the plugin list check')
self._run_plugin_list_setup(plugin_list)
Expand All @@ -63,13 +62,20 @@ def _run_plugin_list(self):
cp = self.exp.checkpoint
for i in range(cp.get_checkpoint_plugin(), n_plugins):
self.exp._set_experiment_for_current_plugin(i)
self.__run_plugin(exp_coll['plugin_dict'][i])
# end the plugin run if savu has been killed
memory_before = cu.get_memory_usage_linux()

plugin_name = self.__run_plugin(exp_coll['plugin_dict'][i])

self.exp._barrier(msg='PluginRunner: plugin complete.')

memory_after = cu.get_memory_usage_linux()
logging.debug("{} memory usage before: {} MB, after: {} MB, change: {} MB".format(
plugin_name, memory_before, memory_after, memory_after - memory_before))

# ********* transport functions ***********
# end the plugin run if savu has been killed
if self._transport_kill_signal():
self._transport_cleanup(i+1)
self._transport_cleanup(i + 1)
break
self.exp._barrier(msg='PluginRunner: No kill signal... continue.')
cp.output_plugin_checkpoint()
Expand All @@ -91,12 +97,12 @@ def _run_plugin_list(self):

def __output_final_message(self):
kill = True if 'killsignal' in \
self.exp.meta_data.get_dictionary().keys() else False
self.exp.meta_data.get_dictionary().keys() else False
msg = "interrupted by killsignal" if kill else "Complete"
stars = 40 if kill else 23
cu.user_message("*"*stars)
cu.user_message("*" * stars)
cu.user_message("* Processing " + msg + " *")
cu.user_message("*"*stars)
cu.user_message("*" * stars)

def __run_plugin(self, plugin_dict):
plugin = self._transport_load_plugin(self.exp, plugin_dict)
Expand All @@ -121,6 +127,7 @@ def __run_plugin(self, plugin_dict):
self._transport_terminate_dataset(data)

self.exp._reorganise_datasets(finalise)
return plugin.name

def _run_plugin_list_setup(self, plugin_list):
""" Run the plugin list through the framework without executing the
Expand All @@ -137,11 +144,11 @@ def _run_plugin_list_setup(self, plugin_list):
for i in range(n_loaders):
pu.plugin_loader(self.exp, plist[i])
self.exp._set_initial_datasets()

# run all plugin setup methods and store information in experiment
# collection
count = 0
for plugin_dict in plist[n_loaders:n_loaders+n_plugins]:
for plugin_dict in plist[n_loaders:n_loaders + n_plugins]:
plugin = pu.plugin_loader(self.exp, plugin_dict, check=True)
plugin._revert_preview(plugin.get_in_datasets())
plugin_dict['cite'] = plugin.get_citation_information()
Expand All @@ -158,7 +165,6 @@ def _run_plugin_list_setup(self, plugin_list):
# ********* transport function ***********
self._transport_update_plugin_list()


def __check_gpu(self):
""" Check if the process list contains GPU processes and determine if
GPUs exists. Add GPU processes to the processes list if required."""
Expand All @@ -167,7 +173,7 @@ def __check_gpu(self):

try:
import pynvml as pv
except:
except Exception:
logging.debug("pyNVML module not found")
raise Exception("pyNVML module not found")

Expand All @@ -190,8 +196,8 @@ def __set_gpu_processes(self, count):
processes = self.exp.meta_data.get('processes')
if not [i for i in processes if 'GPU' in i]:
logging.debug("GPU processes missing. GPUs found so adding them.")
cpus = ['CPU'+str(i) for i in range(count)]
gpus = ['GPU'+str(i) for i in range(count)]
cpus = ['CPU' + str(i) for i in range(count)]
gpus = ['GPU' + str(i) for i in range(count)]
for i in range(min(count, len(processes))):
processes[processes.index(cpus[i])] = gpus[i]
self.exp.meta_data.set('processes', processes)
46 changes: 39 additions & 7 deletions savu/core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.


"""
.. module:: utils
:platform: Unix
Expand All @@ -20,14 +21,18 @@
"""

from __future__ import print_function, division

import itertools
import logging
import logging.handlers as handlers
import itertools

from mpi4py import MPI


def logfunction(func):
""" Decorator to add logging information around calls for use with . """

def _wrapper(*args, **kwds):
logging.info("Start::%s:%s",
func.__module__,
Expand All @@ -37,11 +42,13 @@ def _wrapper(*args, **kwds):
func.__module__,
func.__name__)
return returnval

return _wrapper


def logmethod(func):
""" Decorator to add logging information around calls for use with . """

def _wrapper(self, *args, **kwds):
logging.info("Start::%s.%s:%s",
func.__module__,
Expand All @@ -53,14 +60,17 @@ def _wrapper(self, *args, **kwds):
self.__class__.__name__,
func.__name__)
return returnval

return _wrapper


def docstring_parameter(*sub):
""" Decorator to add strings to a doc string."""

def dec(obj):
obj.__doc__ = obj.__doc__.format(*sub)
return obj

return dec


Expand Down Expand Up @@ -185,17 +195,17 @@ def _send_email(address):
you = address
# Open a plain text file for reading. For this example, assume that
# the text file contains only ASCII characters.
# fp = open(textfile, 'rb')
# # Create a text/plain message
# msg = MIMEText(fp.read())
# fp.close()
# fp = open(textfile, 'rb')
# # Create a text/plain message
# msg = MIMEText(fp.read())
# fp.close()

# me == the sender's email address
# you == the recipient's email address
msg['Subject'] = 'Your Savu job has completed'
msg['From'] = me
msg['To'] = you

# Send the message via our own SMTP server, but don't include the
# envelope header.
s = smtplib.SMTP('localhost')
Expand All @@ -209,6 +219,28 @@ def _savu_encoder(data):

def _savu_decoder(data):
if isinstance(data, str) and len(data.split('#savu_encoded#')) > 1:
exec('data = ' + data.split('#savu_encoded#')[-1])
exec ('data = ' + data.split('#savu_encoded#')[-1])
return data
return data


def get_memory_usage_linux(kb=False, mb=True):
"""
:param kb: Return the value in Kilobytes
:param mb: Return the value in Megabytes
:return: The string of the value in either KB or MB
:rtype str
"""

try:
# Windows doesn't seem to have resource package, so this will
# silently fail
import resource as res
except ImportError:
return 0, 0

if kb:
return int(res.getrusage(res.RUSAGE_SELF).ru_maxrss)

if mb:
return int(res.getrusage(res.RUSAGE_SELF).ru_maxrss) // 1024
38 changes: 19 additions & 19 deletions savu/data/experiment_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
.. moduleauthor:: Nicola Wadeson <scientificsoftware@diamond.ac.uk>
"""
from __future__ import print_function, division, absolute_import

import os
import copy
import h5py
import logging
from mpi4py import MPI

import savu.plugins.utils as pu
from savu.data.meta_data import MetaData
from savu.data.plugin_list import PluginList
from savu.data.data_structures.data import Data
Expand Down Expand Up @@ -63,7 +63,7 @@ def __meta_data_setup(self, process_file):
self.meta_data.plugin_list = PluginList()
try:
rtype = self.meta_data.get('run_type')
if rtype is 'test':
if rtype == 'test':
self.meta_data.plugin_list.plugin_list = \
self.meta_data.get('plugin_list')
else:
Expand Down Expand Up @@ -125,7 +125,7 @@ def __set_system_params(self):
# look in conda environment to see which version is being used
savu_path = sys.modules['savu'].__path__[0]
sys_files = os.path.join(
os.path.dirname(savu_path), 'system_files')
os.path.dirname(savu_path), 'system_files')
subdirs = os.listdir(sys_files)
sys_folder = 'dls' if len(subdirs) > 1 else subdirs[0]
fname = 'system_parameters.yml'
Expand All @@ -145,7 +145,7 @@ def _check_checkpoint(self):
def _add_input_data_to_nxs_file(self, transport):
# save the loaded data to file
h5 = Hdf5Utils(self)
for name, data in self.index['in_data'].iteritems():
for name, data in self.index['in_data'].items():
self.meta_data.set(['link_type', name], 'input_data')
self.meta_data.set(['group_name', name], name)
self.meta_data.set(['filename', name], data.backing_file)
Expand Down Expand Up @@ -204,36 +204,36 @@ def _set_nxs_file(self):
log_folder.close()

self._create_nxs_entry()

def _create_nxs_entry(self):
logging.debug("Testing nexus file")
import h5py
if self.meta_data.get('process') == \
len(self.meta_data.get('processes'))-1:
with h5py.File(self.meta_data.get('nxs_filename'), 'w') as nxs_file:
entry_group = nxs_file.create_group('entry')
entry_group.attrs['NX_class'] = 'NXentry'
with h5py.File(self.meta_data.get('nxs_filename'), 'w') as nxs_file:
entry_group = nxs_file.create_group('entry')
entry_group.attrs['NX_class'] = 'NXentry'

def _clear_data_objects(self):
self.index["out_data"] = {}
self.index["in_data"] = {}

def _merge_out_data_to_in(self):
for key, data in self.index["out_data"].iteritems():
for key, data in self.index["out_data"].items():
if data.remove is False:
self.index['in_data'][key] = data
self.index["out_data"] = {}

def _finalise_experiment_for_current_plugin(self):
finalise = {}
finalise = {'remove': [], 'keep': []}
# populate nexus file with out_dataset information and determine which
# datasets to remove from the framework.
finalise['remove'] = []
finalise['keep'] = []

for key, data in self.index['out_data'].iteritems():
thelist = 'remove' if data.remove else 'keep'
finalise[thelist].append(data)
for key, data in self.index['out_data'].items():
if data.remove is True:
finalise['remove'].append(data)
else:
finalise['keep'].append(data)

# find in datasets to replace
finalise['replace'] = []
Expand All @@ -252,7 +252,7 @@ def _reorganise_datasets(self, finalise):
del self.index["out_data"][data.data_info.get('name')]

# Add remaining output datasets to input datasets
for name, data in self.index['out_data'].iteritems():
for name, data in self.index['out_data'].items():
data.get_preview().set_preview([])
self.index["in_data"][name] = copy.deepcopy(data)
self.index['out_data'] = {}
Expand All @@ -262,7 +262,7 @@ def __unreplicate_data(self):
from savu.data.data_structures.data_types.replicate import Replicate
for in_data in in_data_list.values():
if isinstance(in_data.data, Replicate):
in_data.data = in_data.data.reset()
in_data.data = in_data.data._reset()

def _set_all_datasets(self, name):
data_names = []
Expand All @@ -284,9 +284,9 @@ def log(self, log_tag, log_level=logging.DEBUG):
Log the contents of the experiment at the specified level
"""
logging.log(log_level, "Experimental Parameters for %s", log_tag)
for key, value in self.index["in_data"].iteritems():
for key, value in self.index["in_data"].items():
logging.log(log_level, "in data (%s) shape = %s", key,
value.get_shape())
for key, value in self.index["in_data"].iteritems():
for key, value in self.index["in_data"].items():
logging.log(log_level, "out data (%s) shape = %s", key,
value.get_shape())
2 changes: 1 addition & 1 deletion savu/test/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def cleanup(options):
Performs folders cleaning in tmp/.
using _shutil_ module in order to delete everything recursively
"""
shutil.rmtree(options["out_path"])
shutil.rmtree(options["out_path"], ignore_errors=True)
"""
classb = savu.test.base_checkpoint_test.BaseCheckpointTest()
cp_folder = os.path.join(options["out_path"], 'checkpoint')
Expand Down
5 changes: 2 additions & 3 deletions savu/tomo_recon.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ def _set_options(args):
basename = os.path.basename(args.in_file)
options['datafile_name'] = os.path.splitext(basename)[0] if basename \
else args.in_file.split(os.sep)[-2]

inter_folder_path = __create_output_folder(args.tmp, out_folder_name)\
if args.tmp else out_folder_path

options['inter_path'] = inter_folder_path
options['log_path'] = args.log if args.log else options['inter_path']
options['nProcesses'] = len(options["process_names"].split(','))
Expand Down Expand Up @@ -226,4 +226,3 @@ def main(input_args=None):

if __name__ == '__main__':
main()

Loading

0 comments on commit 483916a

Please sign in to comment.