diff --git a/bin/zkrsync.py b/bin/zkrsync.py index 01e4e42..b5b8b06 100644 --- a/bin/zkrsync.py +++ b/bin/zkrsync.py @@ -42,6 +42,7 @@ from kazoo.recipe.watchers import DataWatch from kazoo.security import make_digest_acl from vsc.utils import fancylogger +from vsc.utils.daemon import Daemon from vsc.utils.generaloption import simple_option from vsc.zk.configparser import get_rootinfo, parse_zkconfig, parse_acls from vsc.zk.rsync.destination import RsyncDestination @@ -53,6 +54,130 @@ CL_SOURCE = "source" logger = fancylogger.getLogger() + +class ZkrsDaemon(Daemon): + def __init__(self, pidfile, zkrs_type, zkrs_options, zkrs_kwargs): + self.zkrs_type = zkrs_type + self.zkrs_options = zkrs_options + self.zkrs_kwargs = zkrs_kwargs + Daemon.__init__(self, pidfile) + + def run(self): + start_zkrs(self.zkrs_type, self.zkrs_options, self.zkrs_kwargs) + + +def init_logging(logfile, session, rstype): + """Initiates the logfile""" + logfile = logfile % { + 'session': session, + 'rstype': rstype, + 'pid': str(os.getpid()) + } + logdir = os.path.dirname(logfile) + if logdir: + if not os.path.exists(logdir): + os.makedirs(logdir) + os.chmod(logdir, stat.S_IRWXU) + + fancylogger.logToFile(logfile) + logger.debug('Logging to file %s:' % logfile) + +def init_pidfile(pidfile, session, rstype): + """ Prepare pidfile """ + if not pidfile: + logger.error('No PID file given!') + sys.exit(1) + else: + pidfile = pidfile % { + 'session': session, + 'rstype': rstype, + 'pid': str(os.getpid()) + } + piddir = os.path.dirname(pidfile) + if piddir: + if not os.path.exists(piddir): + os.makedirs(piddir) + os.chmod(piddir, stat.S_IRWXU) + return pidfile + +def get_state(servers, kwargs): + """Get the state of a running session""" + rsyncP = RsyncSource(go.options.servers, **kwargs) + logger.info('Progress: %s of %s paths remaining' % (rsyncP.len_paths(), rsyncP.paths_total)) + rsyncP.exit() + sys.exit(0) + +def do_pathsonly(options, kwargs): + """Only build the pathqueue and return timings""" + kwargs['rsyncdepth'] = options.depth + kwargs['excludere'] = options.excludere + rsyncP = RsyncSource(options.servers, **kwargs) + locked = rsyncP.acq_lock() + if locked: + starttime = time.time() + rsyncP.build_pathqueue() + endtime = time.time() + timing = endtime - starttime + pathqueue = rsyncP.path_queue + logger.info('Building with depth %i took %f seconds walltime. there are %i paths in the Queue' + % (options.depth, timing, len(pathqueue))) + rsyncP.delete(pathqueue.path, recursive=True) + rsyncP.release_lock() + else: + logger.error('There is already a lock on the pathtree of this session') + + rsyncP.exit() + sys.exit(0) + +def start_destination(options, kwargs): + """Starts a destination: Start zookeeper connection and rsync daemon""" + kwargs['rsyncport'] = options.rsyncport + kwargs['startport'] = options.startport + kwargs['domain'] = options.domain + rsyncD = RsyncDestination(options.servers, **kwargs) + rsyncD.run() + + logger.debug('%s Ready' % rsyncD.get_whoami()) + rsyncD.exit() + sys.exit(0) + +def start_source(options, kwargs): + """ Start a rsync source""" + kwargs['rsyncdepth'] = options.depth + kwargs['dryrun'] = options.dryrun + kwargs['delete'] = options.delete + kwargs['excludere'] = options.excludere + # Start zookeeper connections + rsyncS = RsyncSource(options.servers, **kwargs) + # Try to retrieve session lock + locked = rsyncS.acq_lock() + + if locked: + logger.debug('lock acquired') + watchnode = rsyncS.start_ready_rwatch() + if not watchnode: + sys.exit(1) + paths_total = rsyncS.build_pathqueue() + todo_paths = paths_total + while not rsyncS.isempty_pathqueue(): + if todo_paths != rsyncS.len_paths(): # Output progress state + todo_paths = rsyncS.len_paths() + logger.info('Progress: %s of %s paths remaining' % (todo_paths, paths_total)) + time.sleep(SLEEP_TIME) + rsyncS.shutdown_all() + + else: + rsyncS.ready_with_stop_watch() + logger.debug('ready to process paths') + while not rsyncS.is_ready(): + logger.debug('trying to get a path out of Queue') + rsyncS.rsync(TIME_OUT) + + logger.debug('%s Ready' % rsyncS.get_whoami()) + + rsyncS.exit() + sys.exit(0) + def zkrsync_parse(options): """Takes options of simple_option and returns all the parameters after checks""" if not options.servers: @@ -80,6 +205,17 @@ def zkrsync_parse(options): return rootcreds, admin_acl, rstype +def start_zkrs(rstype, options, kwargs): + """ Start a run of zkrs""" + if options.state: + get_state(options.servers, kwargs) + elif options.pathsonly: + do_pathsonly(options, kwargs) + elif rstype == CL_DEST: + start_destination(options, kwargs) + elif rstype == CL_SOURCE: + start_source(options, kwargs) + def main(): """ Start a new rsync client (destination or source) in a specified session """ options = { @@ -103,8 +239,10 @@ def main(): # Source clients options; should be the same on all clients of the session!: 'delete' : ('run rsync with --delete', None, 'store_true', False), # Individual client options + 'daemon' : ('daemonize client', None, 'store_true', False), 'domain' : ('substitute domain', None, 'store', None), 'logfile' : ('Output to logfile', None, 'store', '/tmp/zkrsync/%(session)s-%(rstype)s-%(pid)s.log'), + 'pidfile' : ('Pidfile template', None, 'store', '/tmp/zkrsync/%(session)s-%(rstype)s-%(pid)s.pid'), # Individual Destination client specific options 'rsyncport' : ('force port on which rsyncd binds', "int", 'store', None), 'startport' : ('offset to look for rsyncd ports', "int", 'store', 4444) @@ -112,21 +250,8 @@ def main(): go = simple_option(options) acreds, admin_acl, rstype = zkrsync_parse(go.options) - if go.options.logfile: - logfile = go.options.logfile % { - 'session': go.options.session, - 'rstype': rstype, - 'pid': str(os.getpid()) - } - logdir = os.path.dirname(logfile) - if logdir: - if not os.path.exists(logdir): - os.makedirs(logdir) - os.chmod(logdir, stat.S_IRWXU) - - fancylogger.logToFile(logfile) - logger.debug('Logging to file %s:' % logfile) + init_logging(go.options.logfile, go.options.session, rstype) kwargs = { 'session' : go.options.session, @@ -135,80 +260,13 @@ def main(): 'rsyncpath' : go.options.rsyncpath, 'netcat' : go.options.netcat, } - if go.options.state: - rsyncP = RsyncSource(go.options.servers, **kwargs) - logger.info('Progress: %s of %s paths remaining' % (rsyncP.len_paths(), rsyncP.paths_total)) - rsyncP.exit() - sys.exit(0) - - elif go.options.pathsonly: - kwargs['rsyncdepth'] = go.options.depth - kwargs['excludere'] = go.options.excludere - rsyncP = RsyncSource(go.options.servers, **kwargs) - locked = rsyncP.acq_lock() - if locked: - starttime = time.time() - rsyncP.build_pathqueue() - endtime = time.time() - timing = endtime - starttime - pathqueue = rsyncP.path_queue - logger.info('Building with depth %i took %f seconds walltime. there are %i paths in the Queue' - % (go.options.depth, timing, len(pathqueue))) - rsyncP.delete(pathqueue.path, recursive=True) - rsyncP.release_lock() - else: - logger.error('There is already a lock on the pathtree of this session') - - rsyncP.exit() - sys.exit(0) - elif rstype == CL_DEST: - # Start zookeeper connection and rsync daemon - kwargs['rsyncport'] = go.options.rsyncport - kwargs['startport'] = go.options.startport - kwargs['domain'] = go.options.domain - rsyncD = RsyncDestination(go.options.servers, **kwargs) - rsyncD.run() - - logger.debug('%s Ready' % rsyncD.get_whoami()) - rsyncD.exit() - sys.exit(0) - - elif rstype == CL_SOURCE: - # Start zookeeper connections - kwargs['rsyncdepth'] = go.options.depth - kwargs['dryrun'] = go.options.dryrun - kwargs['delete'] = go.options.delete - kwargs['excludere'] = go.options.excludere - rsyncS = RsyncSource(go.options.servers, **kwargs) - # Try to retrieve session lock - locked = rsyncS.acq_lock() - - if locked: - logger.debug('lock acquired') - watchnode = rsyncS.start_ready_rwatch() - if not watchnode: - sys.exit(1) - paths_total = rsyncS.build_pathqueue() - todo_paths = paths_total - while not rsyncS.isempty_pathqueue(): - if todo_paths != rsyncS.len_paths(): # Output progress state - todo_paths = rsyncS.len_paths() - logger.info('Progress: %s of %s paths remaining' % (todo_paths, paths_total)) - time.sleep(SLEEP_TIME) - rsyncS.shutdown_all() - rsyncS.exit() - sys.exit(0) - else: - rsyncS.ready_with_stop_watch() - logger.debug('ready to process paths') - while not rsyncS.is_ready(): - logger.debug('trying to get a path out of Queue') - rsyncS.rsync(TIME_OUT) - - logger.debug('%s Ready' % rsyncS.get_whoami()) - rsyncS.exit() - sys.exit(0) + if go.options.daemon: + pidfile = init_pidfile(go.options.pidfile, go.options.session, rstype) + zkrsdaemon = ZkrsDaemon(pidfile, rstype, go.options, kwargs) + zkrsdaemon.start() + else: + start_zkrs(rstype, go.options, kwargs) if __name__ == '__main__': main() diff --git a/setup.py b/setup.py index 34a3a35..2d0d61d 100755 --- a/setup.py +++ b/setup.py @@ -47,7 +47,7 @@ def remove_bdist_rpm_source_file(): PACKAGE = { 'name': 'vsc-zk', - 'version': '0.6.1', + 'version': '0.6.2', 'author': [sdw], 'maintainer': [sdw, kw], 'packages': ['vsc', 'vsc.zk', 'vsc.zk.rsync'],