diff --git a/wavefront_push.py b/wavefront_push.py index a7a2b7d..c849638 100644 --- a/wavefront_push.py +++ b/wavefront_push.py @@ -1,4 +1,4 @@ -#! /usr/bin/env python +#!/bin/env python # TODOs: # - Clean up logging (Propose prefixing all messages with something identifying the plugin) @@ -15,6 +15,7 @@ # Import wavefront_push # # +# persist_connection true # default value # server 10.255.254.255 # port 2878 # prefix collectd @@ -23,7 +24,11 @@ # # -import Queue, socket, threading, time, re +import Queue +import socket +import threading +import time +import re from collections import namedtuple try: @@ -32,6 +37,7 @@ # This lets me at least parse the plugin before I feed it to collectd pass + # Hack! # collectd < 5.5 does not have get_dataset, this should "mock" get_dataset and have # approximately the same behavior. ( collectd.get_dataset(typename) -> (name, type, min val, max val) @@ -39,16 +45,21 @@ class CollectdDS: """Emulate collectd.get_dataset as it's a relatively "new" addition to collectd-python CollectDS(filename) - load tyeps.db given by filename CollectDS.__call__(typename) -> type description (name, type, min max) - drop-in replacement for collectd.get_dataset """ ds_value_type = namedtuple("ds_value_type", ["name", "type", "min", "max"]) # Collectd types - DS_TYPE_COUNTER=0 - DS_TYPE_GAUGE=1 - DS_TYPE_DERIVE=2 - DS_TYPE_ABSOLUTE=3 - TYPES={ 'DERIVE': DS_TYPE_DERIVE, 'GAUGE': DS_TYPE_GAUGE, 'ABSOLUTE': DS_TYPE_ABSOLUTE, 'COUNTER': DS_TYPE_COUNTER } + DS_TYPE_COUNTER = 0 + DS_TYPE_GAUGE = 1 + DS_TYPE_DERIVE = 2 + DS_TYPE_ABSOLUTE = 3 + TYPES = { + 'DERIVE': DS_TYPE_DERIVE, + 'GAUGE': DS_TYPE_GAUGE, + 'ABSOLUTE': DS_TYPE_ABSOLUTE, + 'COUNTER': DS_TYPE_COUNTER + } + def __init__(self, ds_file=None): """__init__(ds_file) - ds_file(None) -> Empty types.db @@ -57,6 +68,7 @@ def __init__(self, ds_file=None): self.__typesdb = {} if ds_file is not None: self.load_file(ds_file) + def load_file(self, filename): with open(filename, "r") as file: for line in file: @@ -67,33 +79,41 @@ def load_file(self, filename): try: values = [] (ds_name, values_string) = re.split('\s+', line, 1) - for (name, type, min, max) in [ e.split(':') for e in re.split(',\s*', values_string) ]: + for (name, type, min, max) in [e.split(':') for e in re.split(',\s*', values_string)]: if min == 'U' or min == '': min = '-inf' if max == 'U' or max == '': max = 'inf' - values.append( self.ds_value_type(name, self.TYPES[type], float(min), float(max)) ) + values.append(self.ds_value_type(name, self.TYPES[type], float(min), float(max))) self.__typesdb[ds_name] = values - except Exception, err: + except Exception: raise + def __call__(self, name): return self.__typesdb[name] + def clear(self): self.__typesdb = {} # FIXME: Global variable, anecodotally this needs to be a reference, "seems to work" sharing between threads # We can potentially avoid the ambiguity around the threading by passing copies in the callback closures. -CONFIG={'prefix': 'collectd', 'qsize': 10240} +CONFIG = { + 'prefix': 'collectd', + 'qsize': 10240, + 'persist_connection': True + } + def retry_connect(host, port, socket_timeout=15): """Retries connection indefinitely On connect: - Close receiving direction, we don't expect anything nor do we ever read from it. - Set the timeout on the socket (default %ds), the rationale for this is that if for some reason send blocks, we don't want to block infinitely. - """ % ( socket_timeout ) + """ % (socket_timeout) while True: try: - s = socket.create_connection( (socket.gethostbyname(host), port) ) + collectd.debug("New connection to %s:%s" % (host, port)) + s = socket.create_connection((socket.gethostbyname(host), port)) s.shutdown(socket.SHUT_RD) # We're not receiving anything, close RX stream s.settimeout(socket_timeout) # We do not want to block indefinitely return s @@ -101,6 +121,7 @@ def retry_connect(host, port, socket_timeout=15): collectd.error("Failed to connect: " + str(e)) time.sleep(1) + def send_wavefront(host, port, item_queue): """send_wavefront(host, port, item_queue) - collectd-wavefront plugin eventloop connects to host:port, continually checks item_queue for messages and sends them. @@ -110,30 +131,22 @@ def send_wavefront(host, port, item_queue): collectd.info("starting wavefront sender") connection = None message = None - - while True: - if connection is None: - connection = retry_connect(host, port) + while True: if message is None: try: message = item_queue.get(False) except Queue.Empty: time.sleep(1) + if not CONFIG['persist_connection'] and connection is not None: + close_connection(connection) + connection = None continue - + + if connection is None: + connection = retry_connect(host, port) + try: - #strip white spaces and quotes for bad plugins - epoch=message.split(' ')[2] - if not re.search("^1[2-9]\d{8}$", epoch): - regex= '(\S+.*)(\s+(\d+\.\d+|\d+)\s+1\d{9}.*)' - m = re.search(regex, message) - if m: - string= m.groups()[0] - string = re.sub(r'[ |"|$|#|\']', '_', string) - remainder= m.groups()[1] - message= string+ remainder+"\n" - # Lazy "send everything", loosing messages is very much a possibility # we should know that we failed to send "something". connection.sendall(message) @@ -142,14 +155,22 @@ def send_wavefront(host, port, item_queue): # Lazy error handling; "something" went wrong, let's # give up and toss the message collectd.error("Failed to send items:" + str(e)) - try: - connection.shutdown(socket.SHUT_WR) - connection.close() - except Exception, e: - collectd.info("wavefront-connection close failed:" + str(e)) - pass + close_connection(connection) connection = None + +def close_connection(connection): + if connection is None: + return + try: + collectd.debug("Clossing connection") + connection.shutdown(socket.SHUT_WR) + connection.close() + except Exception, e: + collectd.info("wavefront-connection close failed:" + str(e)) + connection = None + + # Currently, the following configuration is accepted # # Server -> hostname/ip to connect to @@ -163,11 +184,13 @@ def send_wavefront(host, port, item_queue): def configure_callback(conf): """Collectd configuration callback. - Parameter conf: Collectd configuration tree""" - tags={} + tags = {} CONFIG['prefix'] = 'collectd' override_types_db = [] for node in conf.children: config_key = node.key.lower() + if config_key == 'persist_connection': + CONFIG['persist_connection'] = node.values[0].lower() == 'true' if config_key == 'tag': try: (tag, value) = node.values @@ -190,20 +213,21 @@ def configure_callback(conf): override_types_db.append(node.values[0]) else: collectd.error("config: %s unknown config key" % (config_key)) - CONFIG['tags_append'] = " " + " ".join(["%s=%s" % (t,v) for (t,v) in tags.iteritems() ]) + CONFIG['tags_append'] = " " + " ".join(["%s=%s" % (t, v) for (t, v) in tags.iteritems()]) - if not 'get_dataset' in dir(collectd): + if 'get_dataset' not in dir(collectd): collectd.info("Trying to shoehorn in collectd types.db") - candidate_file_list = override_types_db + [ '/usr/share/collectd/types.db' ] + [ None ] + candidate_file_list = override_types_db + ['/usr/share/collectd/types.db'] + [None] for candidate_file in candidate_file_list: try: setattr(collectd, 'get_dataset', CollectdDS(candidate_file)) collectd.info("Loaded %s as types.db" % (candidate_file)) break except Exception, e: - collectd.warning("Tried and failed to load %s: %s" % ( candidate_file, str(e) )) - - collectd.info("config(%s,tags='%s'" % ( ",".join(["%s=%s" % (k,v) for (k,v) in CONFIG.iteritems()]), CONFIG['tags_append'])) + collectd.warning("Tried and failed to load %s: %s" % (candidate_file, str(e))) + + collectd.info("config(%s,tags='%s'" % (",".join(["%s=%s" % (k, v) for (k, v) in CONFIG.iteritems()]), CONFIG['tags_append'])) + def init_callback(): """Collectd initialization callback. @@ -212,7 +236,7 @@ def init_callback(): queue_size = CONFIG['qsize'] except KeyError: queue_size = 1024 - CONFIG['queue']=Queue.Queue(queue_size) + CONFIG['queue'] = Queue.Queue(queue_size) if CONFIG['server'] and CONFIG['port']: sender = threading.Thread(target=send_wavefront, args=(CONFIG['server'], CONFIG['port'], CONFIG['queue'])) sender.setDaemon(True) @@ -221,6 +245,7 @@ def init_callback(): else: collectd.error("wavefront-forwarder has no destination to send data") + def write_callback(value): """Collectd write callback, responsible for formatting and writing messages onto the send queue""" @@ -255,11 +280,11 @@ def write_callback(value): # collectd.interface-em1.if_octets.tx = 10 # collectd.interface-em1.if_octets.rx = 20 # - - metric_name = "%s%s.%s%s" % ( value.plugin, - '.' + value.plugin_instance if len(value.plugin_instance) > 0 else '', - value.type, - '.' + value.type_instance if len(value.type_instance) > 0 else '' ) + + metric_name = "%s%s.%s%s" % (value.plugin, + '-' + value.plugin_instance if len(value.plugin_instance) > 0 else '', + value.type, + '-' + value.type_instance if len(value.type_instance) > 0 else '') try: prefix = CONFIG['prefix'] @@ -273,17 +298,17 @@ def write_callback(value): except KeyError, e: tags_append = '' - append_names = [ '.' + append_name if append_name != 'value' else '' - for (append_name, _, _, _) - in collectd.get_dataset(value.type) ] + append_names = ['.' + append_name if append_name != 'value' else '' + for (append_name, _, _, _) + in collectd.get_dataset(value.type)] if len(append_names) != len(value.values): collectd.error("len(ds_names) != len(value.values)") return - - msg = "".join([ "%s %f %d host=%s%s\n" % (prefix + metric_name + postfix, metric_value, value.time, value.host, tags_append) - for (postfix, metric_value) - in zip(append_names, value.values) ]) + + msg = "".join(["%s %f %d host=%s%s\n" % (prefix + metric_name + postfix, metric_value, value.time, value.host, tags_append) + for (postfix, metric_value) + in zip(append_names, value.values)]) try: CONFIG['queue'].put(msg, block=False) except Exception, e: