Skip to content

Commit

Permalink
Merge pull request #4 from kwaegema/master
Browse files Browse the repository at this point in the history
daemonize option added
  • Loading branch information
stdweird committed Apr 14, 2014
2 parents 3f5a1d6 + 4db7d85 commit 293c56f
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 88 deletions.
232 changes: 145 additions & 87 deletions bin/zkrsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from kazoo.recipe.watchers import DataWatch
from kazoo.security import make_digest_acl
from vsc.utils import fancylogger
from vsc.utils.daemon import Daemon
from vsc.utils.generaloption import simple_option
from vsc.zk.configparser import get_rootinfo, parse_zkconfig, parse_acls
from vsc.zk.rsync.destination import RsyncDestination
Expand All @@ -53,6 +54,130 @@
CL_SOURCE = "source"
logger = fancylogger.getLogger()


class ZkrsDaemon(Daemon):
def __init__(self, pidfile, zkrs_type, zkrs_options, zkrs_kwargs):
self.zkrs_type = zkrs_type
self.zkrs_options = zkrs_options
self.zkrs_kwargs = zkrs_kwargs
Daemon.__init__(self, pidfile)

def run(self):
start_zkrs(self.zkrs_type, self.zkrs_options, self.zkrs_kwargs)


def init_logging(logfile, session, rstype):
"""Initiates the logfile"""
logfile = logfile % {
'session': session,
'rstype': rstype,
'pid': str(os.getpid())
}
logdir = os.path.dirname(logfile)
if logdir:
if not os.path.exists(logdir):
os.makedirs(logdir)
os.chmod(logdir, stat.S_IRWXU)

fancylogger.logToFile(logfile)
logger.debug('Logging to file %s:' % logfile)

def init_pidfile(pidfile, session, rstype):
""" Prepare pidfile """
if not pidfile:
logger.error('No PID file given!')
sys.exit(1)
else:
pidfile = pidfile % {
'session': session,
'rstype': rstype,
'pid': str(os.getpid())
}
piddir = os.path.dirname(pidfile)
if piddir:
if not os.path.exists(piddir):
os.makedirs(piddir)
os.chmod(piddir, stat.S_IRWXU)
return pidfile

def get_state(servers, kwargs):
"""Get the state of a running session"""
rsyncP = RsyncSource(go.options.servers, **kwargs)
logger.info('Progress: %s of %s paths remaining' % (rsyncP.len_paths(), rsyncP.paths_total))
rsyncP.exit()
sys.exit(0)

def do_pathsonly(options, kwargs):
"""Only build the pathqueue and return timings"""
kwargs['rsyncdepth'] = options.depth
kwargs['excludere'] = options.excludere
rsyncP = RsyncSource(options.servers, **kwargs)
locked = rsyncP.acq_lock()
if locked:
starttime = time.time()
rsyncP.build_pathqueue()
endtime = time.time()
timing = endtime - starttime
pathqueue = rsyncP.path_queue
logger.info('Building with depth %i took %f seconds walltime. there are %i paths in the Queue'
% (options.depth, timing, len(pathqueue)))
rsyncP.delete(pathqueue.path, recursive=True)
rsyncP.release_lock()
else:
logger.error('There is already a lock on the pathtree of this session')

rsyncP.exit()
sys.exit(0)

def start_destination(options, kwargs):
"""Starts a destination: Start zookeeper connection and rsync daemon"""
kwargs['rsyncport'] = options.rsyncport
kwargs['startport'] = options.startport
kwargs['domain'] = options.domain
rsyncD = RsyncDestination(options.servers, **kwargs)
rsyncD.run()

logger.debug('%s Ready' % rsyncD.get_whoami())
rsyncD.exit()
sys.exit(0)

def start_source(options, kwargs):
""" Start a rsync source"""
kwargs['rsyncdepth'] = options.depth
kwargs['dryrun'] = options.dryrun
kwargs['delete'] = options.delete
kwargs['excludere'] = options.excludere
# Start zookeeper connections
rsyncS = RsyncSource(options.servers, **kwargs)
# Try to retrieve session lock
locked = rsyncS.acq_lock()

if locked:
logger.debug('lock acquired')
watchnode = rsyncS.start_ready_rwatch()
if not watchnode:
sys.exit(1)
paths_total = rsyncS.build_pathqueue()
todo_paths = paths_total
while not rsyncS.isempty_pathqueue():
if todo_paths != rsyncS.len_paths(): # Output progress state
todo_paths = rsyncS.len_paths()
logger.info('Progress: %s of %s paths remaining' % (todo_paths, paths_total))
time.sleep(SLEEP_TIME)
rsyncS.shutdown_all()

else:
rsyncS.ready_with_stop_watch()
logger.debug('ready to process paths')
while not rsyncS.is_ready():
logger.debug('trying to get a path out of Queue')
rsyncS.rsync(TIME_OUT)

logger.debug('%s Ready' % rsyncS.get_whoami())

rsyncS.exit()
sys.exit(0)

def zkrsync_parse(options):
"""Takes options of simple_option and returns all the parameters after checks"""
if not options.servers:
Expand Down Expand Up @@ -80,6 +205,17 @@ def zkrsync_parse(options):

return rootcreds, admin_acl, rstype

def start_zkrs(rstype, options, kwargs):
""" Start a run of zkrs"""
if options.state:
get_state(options.servers, kwargs)
elif options.pathsonly:
do_pathsonly(options, kwargs)
elif rstype == CL_DEST:
start_destination(options, kwargs)
elif rstype == CL_SOURCE:
start_source(options, kwargs)

def main():
""" Start a new rsync client (destination or source) in a specified session """
options = {
Expand All @@ -103,30 +239,19 @@ def main():
# Source clients options; should be the same on all clients of the session!:
'delete' : ('run rsync with --delete', None, 'store_true', False),
# Individual client options
'daemon' : ('daemonize client', None, 'store_true', False),
'domain' : ('substitute domain', None, 'store', None),
'logfile' : ('Output to logfile', None, 'store', '/tmp/zkrsync/%(session)s-%(rstype)s-%(pid)s.log'),
'pidfile' : ('Pidfile template', None, 'store', '/tmp/zkrsync/%(session)s-%(rstype)s-%(pid)s.pid'),
# Individual Destination client specific options
'rsyncport' : ('force port on which rsyncd binds', "int", 'store', None),
'startport' : ('offset to look for rsyncd ports', "int", 'store', 4444)
}

go = simple_option(options)
acreds, admin_acl, rstype = zkrsync_parse(go.options)

if go.options.logfile:
logfile = go.options.logfile % {
'session': go.options.session,
'rstype': rstype,
'pid': str(os.getpid())
}
logdir = os.path.dirname(logfile)
if logdir:
if not os.path.exists(logdir):
os.makedirs(logdir)
os.chmod(logdir, stat.S_IRWXU)

fancylogger.logToFile(logfile)
logger.debug('Logging to file %s:' % logfile)
init_logging(go.options.logfile, go.options.session, rstype)

kwargs = {
'session' : go.options.session,
Expand All @@ -135,80 +260,13 @@ def main():
'rsyncpath' : go.options.rsyncpath,
'netcat' : go.options.netcat,
}
if go.options.state:
rsyncP = RsyncSource(go.options.servers, **kwargs)
logger.info('Progress: %s of %s paths remaining' % (rsyncP.len_paths(), rsyncP.paths_total))
rsyncP.exit()
sys.exit(0)

elif go.options.pathsonly:
kwargs['rsyncdepth'] = go.options.depth
kwargs['excludere'] = go.options.excludere
rsyncP = RsyncSource(go.options.servers, **kwargs)
locked = rsyncP.acq_lock()
if locked:
starttime = time.time()
rsyncP.build_pathqueue()
endtime = time.time()
timing = endtime - starttime
pathqueue = rsyncP.path_queue
logger.info('Building with depth %i took %f seconds walltime. there are %i paths in the Queue'
% (go.options.depth, timing, len(pathqueue)))
rsyncP.delete(pathqueue.path, recursive=True)
rsyncP.release_lock()
else:
logger.error('There is already a lock on the pathtree of this session')

rsyncP.exit()
sys.exit(0)

elif rstype == CL_DEST:
# Start zookeeper connection and rsync daemon
kwargs['rsyncport'] = go.options.rsyncport
kwargs['startport'] = go.options.startport
kwargs['domain'] = go.options.domain
rsyncD = RsyncDestination(go.options.servers, **kwargs)
rsyncD.run()

logger.debug('%s Ready' % rsyncD.get_whoami())
rsyncD.exit()
sys.exit(0)

elif rstype == CL_SOURCE:
# Start zookeeper connections
kwargs['rsyncdepth'] = go.options.depth
kwargs['dryrun'] = go.options.dryrun
kwargs['delete'] = go.options.delete
kwargs['excludere'] = go.options.excludere
rsyncS = RsyncSource(go.options.servers, **kwargs)
# Try to retrieve session lock
locked = rsyncS.acq_lock()

if locked:
logger.debug('lock acquired')
watchnode = rsyncS.start_ready_rwatch()
if not watchnode:
sys.exit(1)
paths_total = rsyncS.build_pathqueue()
todo_paths = paths_total
while not rsyncS.isempty_pathqueue():
if todo_paths != rsyncS.len_paths(): # Output progress state
todo_paths = rsyncS.len_paths()
logger.info('Progress: %s of %s paths remaining' % (todo_paths, paths_total))
time.sleep(SLEEP_TIME)
rsyncS.shutdown_all()
rsyncS.exit()
sys.exit(0)
else:
rsyncS.ready_with_stop_watch()
logger.debug('ready to process paths')
while not rsyncS.is_ready():
logger.debug('trying to get a path out of Queue')
rsyncS.rsync(TIME_OUT)

logger.debug('%s Ready' % rsyncS.get_whoami())
rsyncS.exit()
sys.exit(0)
if go.options.daemon:
pidfile = init_pidfile(go.options.pidfile, go.options.session, rstype)
zkrsdaemon = ZkrsDaemon(pidfile, rstype, go.options, kwargs)
zkrsdaemon.start()
else:
start_zkrs(rstype, go.options, kwargs)

if __name__ == '__main__':
main()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def remove_bdist_rpm_source_file():

PACKAGE = {
'name': 'vsc-zk',
'version': '0.6.1',
'version': '0.6.2',
'author': [sdw],
'maintainer': [sdw, kw],
'packages': ['vsc', 'vsc.zk', 'vsc.zk.rsync'],
Expand Down

0 comments on commit 293c56f

Please sign in to comment.