From c8ff208112abf8e97219ed1aec22d612c616840b Mon Sep 17 00:00:00 2001 From: Rene Martin Date: Tue, 28 Jun 2016 10:52:33 +0100 Subject: [PATCH] Add support for non persistent connections. In an escenario where the wavefront proxy is in an ASG behind an ELB where the proxy instances may change, is better to not have persistent connections in order to ensure that the current one is always created against a running instance. What is more, if you are using the connection draining feature of the ELB's using non persistent connections will ensure you that you can replace the instances in the ASG without any data loss. As the wavefront protocol doesn't expect to receive an ACK from the remote server the client can't notice a dropped connection, a connection that was not properly closed, and it can lead to send metrics to nowhere. --- wavefront_push.py | 137 +++++++++++++++++++++++++++------------------- 1 file changed, 81 insertions(+), 56 deletions(-) diff --git a/wavefront_push.py b/wavefront_push.py index a7a2b7d..c849638 100644 --- a/wavefront_push.py +++ b/wavefront_push.py @@ -1,4 +1,4 @@ -#! /usr/bin/env python +#!/bin/env python # TODOs: # - Clean up logging (Propose prefixing all messages with something identifying the plugin) @@ -15,6 +15,7 @@ # Import wavefront_push # # +# persist_connection true # default value # server 10.255.254.255 # port 2878 # prefix collectd @@ -23,7 +24,11 @@ # # -import Queue, socket, threading, time, re +import Queue +import socket +import threading +import time +import re from collections import namedtuple try: @@ -32,6 +37,7 @@ # This lets me at least parse the plugin before I feed it to collectd pass + # Hack! # collectd < 5.5 does not have get_dataset, this should "mock" get_dataset and have # approximately the same behavior. ( collectd.get_dataset(typename) -> (name, type, min val, max val) @@ -39,16 +45,21 @@ class CollectdDS: """Emulate collectd.get_dataset as it's a relatively "new" addition to collectd-python CollectDS(filename) - load tyeps.db given by filename CollectDS.__call__(typename) -> type description (name, type, min max) - drop-in replacement for collectd.get_dataset """ ds_value_type = namedtuple("ds_value_type", ["name", "type", "min", "max"]) # Collectd types - DS_TYPE_COUNTER=0 - DS_TYPE_GAUGE=1 - DS_TYPE_DERIVE=2 - DS_TYPE_ABSOLUTE=3 - TYPES={ 'DERIVE': DS_TYPE_DERIVE, 'GAUGE': DS_TYPE_GAUGE, 'ABSOLUTE': DS_TYPE_ABSOLUTE, 'COUNTER': DS_TYPE_COUNTER } + DS_TYPE_COUNTER = 0 + DS_TYPE_GAUGE = 1 + DS_TYPE_DERIVE = 2 + DS_TYPE_ABSOLUTE = 3 + TYPES = { + 'DERIVE': DS_TYPE_DERIVE, + 'GAUGE': DS_TYPE_GAUGE, + 'ABSOLUTE': DS_TYPE_ABSOLUTE, + 'COUNTER': DS_TYPE_COUNTER + } + def __init__(self, ds_file=None): """__init__(ds_file) - ds_file(None) -> Empty types.db @@ -57,6 +68,7 @@ def __init__(self, ds_file=None): self.__typesdb = {} if ds_file is not None: self.load_file(ds_file) + def load_file(self, filename): with open(filename, "r") as file: for line in file: @@ -67,33 +79,41 @@ def load_file(self, filename): try: values = [] (ds_name, values_string) = re.split('\s+', line, 1) - for (name, type, min, max) in [ e.split(':') for e in re.split(',\s*', values_string) ]: + for (name, type, min, max) in [e.split(':') for e in re.split(',\s*', values_string)]: if min == 'U' or min == '': min = '-inf' if max == 'U' or max == '': max = 'inf' - values.append( self.ds_value_type(name, self.TYPES[type], float(min), float(max)) ) + values.append(self.ds_value_type(name, self.TYPES[type], float(min), float(max))) self.__typesdb[ds_name] = values - except Exception, err: + except Exception: raise + def __call__(self, name): return self.__typesdb[name] + def clear(self): self.__typesdb = {} # FIXME: Global variable, anecodotally this needs to be a reference, "seems to work" sharing between threads # We can potentially avoid the ambiguity around the threading by passing copies in the callback closures. -CONFIG={'prefix': 'collectd', 'qsize': 10240} +CONFIG = { + 'prefix': 'collectd', + 'qsize': 10240, + 'persist_connection': True + } + def retry_connect(host, port, socket_timeout=15): """Retries connection indefinitely On connect: - Close receiving direction, we don't expect anything nor do we ever read from it. - Set the timeout on the socket (default %ds), the rationale for this is that if for some reason send blocks, we don't want to block infinitely. - """ % ( socket_timeout ) + """ % (socket_timeout) while True: try: - s = socket.create_connection( (socket.gethostbyname(host), port) ) + collectd.debug("New connection to %s:%s" % (host, port)) + s = socket.create_connection((socket.gethostbyname(host), port)) s.shutdown(socket.SHUT_RD) # We're not receiving anything, close RX stream s.settimeout(socket_timeout) # We do not want to block indefinitely return s @@ -101,6 +121,7 @@ def retry_connect(host, port, socket_timeout=15): collectd.error("Failed to connect: " + str(e)) time.sleep(1) + def send_wavefront(host, port, item_queue): """send_wavefront(host, port, item_queue) - collectd-wavefront plugin eventloop connects to host:port, continually checks item_queue for messages and sends them. @@ -110,30 +131,22 @@ def send_wavefront(host, port, item_queue): collectd.info("starting wavefront sender") connection = None message = None - - while True: - if connection is None: - connection = retry_connect(host, port) + while True: if message is None: try: message = item_queue.get(False) except Queue.Empty: time.sleep(1) + if not CONFIG['persist_connection'] and connection is not None: + close_connection(connection) + connection = None continue - + + if connection is None: + connection = retry_connect(host, port) + try: - #strip white spaces and quotes for bad plugins - epoch=message.split(' ')[2] - if not re.search("^1[2-9]\d{8}$", epoch): - regex= '(\S+.*)(\s+(\d+\.\d+|\d+)\s+1\d{9}.*)' - m = re.search(regex, message) - if m: - string= m.groups()[0] - string = re.sub(r'[ |"|$|#|\']', '_', string) - remainder= m.groups()[1] - message= string+ remainder+"\n" - # Lazy "send everything", loosing messages is very much a possibility # we should know that we failed to send "something". connection.sendall(message) @@ -142,14 +155,22 @@ def send_wavefront(host, port, item_queue): # Lazy error handling; "something" went wrong, let's # give up and toss the message collectd.error("Failed to send items:" + str(e)) - try: - connection.shutdown(socket.SHUT_WR) - connection.close() - except Exception, e: - collectd.info("wavefront-connection close failed:" + str(e)) - pass + close_connection(connection) connection = None + +def close_connection(connection): + if connection is None: + return + try: + collectd.debug("Clossing connection") + connection.shutdown(socket.SHUT_WR) + connection.close() + except Exception, e: + collectd.info("wavefront-connection close failed:" + str(e)) + connection = None + + # Currently, the following configuration is accepted # # Server -> hostname/ip to connect to @@ -163,11 +184,13 @@ def send_wavefront(host, port, item_queue): def configure_callback(conf): """Collectd configuration callback. - Parameter conf: Collectd configuration tree""" - tags={} + tags = {} CONFIG['prefix'] = 'collectd' override_types_db = [] for node in conf.children: config_key = node.key.lower() + if config_key == 'persist_connection': + CONFIG['persist_connection'] = node.values[0].lower() == 'true' if config_key == 'tag': try: (tag, value) = node.values @@ -190,20 +213,21 @@ def configure_callback(conf): override_types_db.append(node.values[0]) else: collectd.error("config: %s unknown config key" % (config_key)) - CONFIG['tags_append'] = " " + " ".join(["%s=%s" % (t,v) for (t,v) in tags.iteritems() ]) + CONFIG['tags_append'] = " " + " ".join(["%s=%s" % (t, v) for (t, v) in tags.iteritems()]) - if not 'get_dataset' in dir(collectd): + if 'get_dataset' not in dir(collectd): collectd.info("Trying to shoehorn in collectd types.db") - candidate_file_list = override_types_db + [ '/usr/share/collectd/types.db' ] + [ None ] + candidate_file_list = override_types_db + ['/usr/share/collectd/types.db'] + [None] for candidate_file in candidate_file_list: try: setattr(collectd, 'get_dataset', CollectdDS(candidate_file)) collectd.info("Loaded %s as types.db" % (candidate_file)) break except Exception, e: - collectd.warning("Tried and failed to load %s: %s" % ( candidate_file, str(e) )) - - collectd.info("config(%s,tags='%s'" % ( ",".join(["%s=%s" % (k,v) for (k,v) in CONFIG.iteritems()]), CONFIG['tags_append'])) + collectd.warning("Tried and failed to load %s: %s" % (candidate_file, str(e))) + + collectd.info("config(%s,tags='%s'" % (",".join(["%s=%s" % (k, v) for (k, v) in CONFIG.iteritems()]), CONFIG['tags_append'])) + def init_callback(): """Collectd initialization callback. @@ -212,7 +236,7 @@ def init_callback(): queue_size = CONFIG['qsize'] except KeyError: queue_size = 1024 - CONFIG['queue']=Queue.Queue(queue_size) + CONFIG['queue'] = Queue.Queue(queue_size) if CONFIG['server'] and CONFIG['port']: sender = threading.Thread(target=send_wavefront, args=(CONFIG['server'], CONFIG['port'], CONFIG['queue'])) sender.setDaemon(True) @@ -221,6 +245,7 @@ def init_callback(): else: collectd.error("wavefront-forwarder has no destination to send data") + def write_callback(value): """Collectd write callback, responsible for formatting and writing messages onto the send queue""" @@ -255,11 +280,11 @@ def write_callback(value): # collectd.interface-em1.if_octets.tx = 10 # collectd.interface-em1.if_octets.rx = 20 # - - metric_name = "%s%s.%s%s" % ( value.plugin, - '.' + value.plugin_instance if len(value.plugin_instance) > 0 else '', - value.type, - '.' + value.type_instance if len(value.type_instance) > 0 else '' ) + + metric_name = "%s%s.%s%s" % (value.plugin, + '-' + value.plugin_instance if len(value.plugin_instance) > 0 else '', + value.type, + '-' + value.type_instance if len(value.type_instance) > 0 else '') try: prefix = CONFIG['prefix'] @@ -273,17 +298,17 @@ def write_callback(value): except KeyError, e: tags_append = '' - append_names = [ '.' + append_name if append_name != 'value' else '' - for (append_name, _, _, _) - in collectd.get_dataset(value.type) ] + append_names = ['.' + append_name if append_name != 'value' else '' + for (append_name, _, _, _) + in collectd.get_dataset(value.type)] if len(append_names) != len(value.values): collectd.error("len(ds_names) != len(value.values)") return - - msg = "".join([ "%s %f %d host=%s%s\n" % (prefix + metric_name + postfix, metric_value, value.time, value.host, tags_append) - for (postfix, metric_value) - in zip(append_names, value.values) ]) + + msg = "".join(["%s %f %d host=%s%s\n" % (prefix + metric_name + postfix, metric_value, value.time, value.host, tags_append) + for (postfix, metric_value) + in zip(append_names, value.values)]) try: CONFIG['queue'].put(msg, block=False) except Exception, e: