diff --git a/statsdpy/__init__.py b/statsdpy/__init__.py index 0028de9..56b88b8 100644 --- a/statsdpy/__init__.py +++ b/statsdpy/__init__.py @@ -2,7 +2,7 @@ #: Version information (major, minor, revision[, 'dev']). -version_info = (0, 0, 11) +version_info = (0, 0, 12) #: Version string 'major.minor.revision'. version = __version__ = ".".join(map(str, version_info)) gettext.install('statsdpy') diff --git a/statsdpy/statsd.py b/statsdpy/statsd.py index fc97866..a7daf09 100644 --- a/statsdpy/statsd.py +++ b/statsdpy/statsd.py @@ -15,6 +15,7 @@ class StatsdServer(object): TRUE_VALUES = set(('true', '1', 'yes', 'on', 't', 'y')) + def __init__(self, conf): self.logger = logging.getLogger('statsdpy') self.logger.setLevel(logging.INFO) @@ -26,6 +27,7 @@ def __init__(self, conf): self.graphite_host = conf.get('graphite_host', '127.0.0.1') self.graphite_port = int(conf.get('graphite_port', '2003')) self.graphite_pport = int(conf.get('graphite_pickle_port', '2004')) + self.graphite_timeout = int(conf.get('graphite_timeout', '5')) self.pickle_proto = conf.get('pickle_protocol') in self.TRUE_VALUES self.max_batch_size = int(conf.get('pickle_batch_size', '300')) self.listen_addr = conf.get('listen_addr', '127.0.0.1') @@ -82,7 +84,7 @@ def _get_batches(self, items): for i in xrange(0, len(items), self.max_batch_size): yield items[i:i + self.max_batch_size] - def report_stats(self, payload): + def report_stats(self, payload, is_retry=False): """ Send data to graphite host @@ -94,34 +96,51 @@ def report_stats(self, payload): else: print "reporting stats -> {\n%s}" % payload try: - with eventlet.Timeout(5, True): - graphite = socket.socket() + graphite = socket.socket() + with eventlet.Timeout(self.graphite_timeout, True): graphite.connect(self.graphite_addr) graphite.sendall(payload) graphite.close() + except eventlet.timeout.Timeout: + self.logger.critical("Timeout sending to graphite") + if self.debug: + print "Timeout talking to graphite" + if not is_retry: + self.logger.critical('Attempting 1 retry!') + self.report_stats(payload, is_retry=True) + else: + self.logger.critical('Already retried once, giving up') except Exception as err: self.logger.critical("error connecting to graphite: %s" % err) if self.debug: print "error connecting to graphite: %s" % err + finally: + try: + graphite.close() + except Exception: + pass def stats_flush(self): """ Periodically flush stats to graphite """ while True: - eventlet.sleep(self.flush_interval) - if self.debug: - print "seen %d stats so far." % self.stats_seen - print "current counters: %s" % self.counters - if self.pickle_proto: - payload = self.pickle_payload() - if payload: - for batch in payload: - self.report_stats(batch) - else: - payload = self.plain_payload() - if payload: - self.report_stats(payload) + try: + eventlet.sleep(self.flush_interval) + if self.debug: + print "seen %d stats so far." % self.stats_seen + print "current counters: %s" % self.counters + if self.pickle_proto: + payload = self.pickle_payload() + if payload: + for batch in payload: + self.report_stats(batch) + else: + payload = self.plain_payload() + if payload: + self.report_stats(payload) + except: # safety net + self.logger.critical('Encountered error in stats_flush loop') def pickle_payload(self): """obtain stats payload in batches of pickle format""" @@ -180,6 +199,7 @@ def plain_payload(self): for key in self.timers: if len(self.timers[key]) > 0: self.process_timer_key(key, tstamp, payload) + self.timers[key] = [] for key in self.gauges: payload.append("%s.%s %d %d\n" % (self.gauge_prefix, key, @@ -333,7 +353,11 @@ def run(self): else: for metric in data.splitlines(): if metric: - self.decode_recvd(metric) + try: + self.decode_recvd(metric) + except: # safety net + self.logger.critical("exception in decode_recvd") + pass class Statsd(Daemon):