Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add memory monitoring to livereduce #15

Merged
merged 3 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ dependencies:
- pyinotify
- pre-commit
- python-build
- psutil
35 changes: 31 additions & 4 deletions scripts/livereduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
import os
import signal
import sys
import threading
import time
from hashlib import md5

import mantid # for clearer error message
import psutil
import pyinotify
from mantid.simpleapi import StartLiveData
from mantid.simpleapi import StartLiveData, mtd
from mantid.utils.logging import log_to_python as mtd_log_to_python
from packaging.version import parse as parse_version

CONVERSION_FACTOR_BYTES_TO_MB = 1.0 / (1024 * 1024)

# ##################
# configure logging
# ##################
Expand Down Expand Up @@ -79,6 +83,13 @@ def stop(cls):
else:
cls.logger.info("mantid not initialized - nothing to cleanup")

def restart_and_clear(self):
self.logger.info("Restarting Live Data and clearing workspaces")
self.stop()
time.sleep(1.0)
mtd.clear()
self.start()


# ##################
# register a signal handler so we can exit gracefully if someone kills us
Expand Down Expand Up @@ -157,6 +168,10 @@ def __init__(self, filename):
self.accumMethod = str(json_doc.get("accum_method", "Add"))
self.periods = json_doc.get("periods", None)
self.spectra = json_doc.get("spectra", None)
self.system_mem_limit_perc = json_doc.get("system_mem_limit_perc", 25) # set to 0 to disable
self.mem_check_interval_sec = json_doc.get("mem_check_interval_sec", 1)
self.mem_limit = psutil.virtual_memory().total * self.system_mem_limit_perc / 100
self.proc_pid = psutil.Process(os.getpid())

# location of the scripts
self.script_dir = json_doc.get("script_dir")
Expand Down Expand Up @@ -321,9 +336,16 @@ def process_default(self, event):
self.scriptfiles[event.pathname] = newmd5
# restart the service
self.logger.info(f'Processing script "{event.pathname}" changed - restarting ' '"StartLiveData"')
self.livemanager.stop()
time.sleep(1.0) # seconds
self.livemanager.start()
self.livemanager.restart_and_clear()


def memory_checker(config, livemanager):
while True:
mem_used = config.proc_pid.memory_info().rss
if mem_used > config.mem_limit:
logger.error(f"Memory usage {mem_used * CONVERSION_FACTOR_BYTES_TO_MB:.2f} MB exceeds limit")
livemanager.restart_and_clear()
time.sleep(config.mem_check_interval_sec)


# determine the configuration file
Expand Down Expand Up @@ -355,6 +377,11 @@ def process_default(self, event):
# start up the live data
liveDataMgr.start()

# start the memory checker
if config.system_mem_limit_perc > 0:
memory_thread = threading.Thread(target=memory_checker, args=(config, liveDataMgr))
rosswhitfield marked this conversation as resolved.
Show resolved Hide resolved
memory_thread.start()

# inotify will keep the program running
notifier.loop()

Expand Down
8 changes: 8 additions & 0 deletions test/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,11 @@ Once the first chunk of live data is processed, `ctrl-C` will
interrupt the process and it will close cleanly.

In testing mode, the logging will go to `${PWD}/livereduce.log` and can be watched with `tail -F livereduce.log`


Example using event data, to test memory monitoring
----------------------------------------------------

This test case will continuously accumulate events until it fails.

Start the server using `test/fake_event_server.py` and use the configuration `test/fake_event.conf`.
9 changes: 9 additions & 0 deletions test/fake_event.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"instrument": "ISIS_Event",
"script_dir": "test",
"update_every": 3,
"CONDA_ENV": "livereduce",
"accum_method":"Add",
"system_mem_limit_perc": 25,
"mem_check_interval_sec": 1
}
24 changes: 24 additions & 0 deletions test/fake_event_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from threading import Thread

from mantid import AlgorithmManager, ConfigService
from mantid.simpleapi import FakeISISEventDAE

facility = ConfigService.getFacility().name()
ConfigService.setFacility("TEST_LIVE")


def startServer():
FakeISISEventDAE(NEvents=1000000)


try:
thread = Thread(target=startServer)
thread.start()
thread.join()
except Exception as e: # noqa: BLE001
print(e)
alg = AlgorithmManager.newestInstanceOf("FakeISISEventDAE")
if alg.isRunning():
alg.cancel()
finally:
ConfigService.setFacility(facility)
1 change: 1 addition & 0 deletions test/reduce_ISIS_Event_live_proc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
print("Running proc script")
Loading