Source code for jobslib.metrics.influxdb

"""
Module :mod:`jobslib.metrics.influxdb` provides :class:`InfluxDBMetrics`
writer.
"""

import datetime
import logging
import os
import time

import retrying

from influxdb.client import InfluxDBClient
from objectvalidator import option

from . import BaseMetrics
from ..config import ConfigGroup, RetryConfigMixin

__all__ = ['InfluxDBMetrics']

logger = logging.getLogger(__name__)


[docs]class InfluxDBMetrics(BaseMetrics): """ InfluxDB metrics implementation. For use of :class:`InfluxDBMetrics` write into :mod:`settings`: .. code-block:: python METRICS = { 'backend': 'jobslib.metrics.influxdb.InfluxDBMetrics', 'options': { 'host': 'hostname', 'port': 8086, 'username': 'root', 'password': 'root', 'database': 'dbname', 'retry_max_attempts': 10, 'retry_wait_multiplier': 50, }, } Or use :envvar:`JOBSLIB_METRICS_INFLUXDB_HOST`, :envvar:`JOBSLIB_METRICS_INFLUXDB_PORT`, :envvar:`JOBSLIB_METRICS_INFLUXDB_USERNAME`, :envvar:`JOBSLIB_METRICS_INFLUXDB_PASSWORD`, :envvar:`JOBSLIB_METRICS_INFLUXDB_DBNAME`, :envvar:`JOBSLIB_METRICS_INFLUXDB_RETRY_MAX_ATTEMPTS` and :envvar:`JOBSLIB_METRICS_INFLUXDB_RETRY_WAIT_MULTIPLIER` environment variables. """ class OptionsConfig(RetryConfigMixin, ConfigGroup): """ Consul liveness options. """ retry_env_prefix = 'JOBSLIB_METRICS_INFLUXDB_' @option(required=True, attrtype=str) def host(self): """ InfluxDB host """ host = os.environ.get('JOBSLIB_METRICS_INFLUXDB_HOST') if host: return host return self._settings.get('host', 'localhost') @option(attrtype=int) def port(self): """ InfluxDB port """ port = os.environ.get('JOBSLIB_METRICS_INFLUXDB_PORT') if port: return int(port) return self._settings.get('port', 8086) @option(required=True, attrtype=str) def username(self): """ InfluxDB username """ username = os.environ.get('JOBSLIB_METRICS_INFLUXDB_USERNAME') if username: return username return self._settings.get('username', 'root') @option(required=True, attrtype=str) def password(self): """ InfluxDB password """ password = os.environ.get('JOBSLIB_METRICS_INFLUXDB_PASSWORD') if password: return password return self._settings.get('password', 'root') @option(attrtype=str) def database(self): """ InfluxDB database """ database = os.environ.get('JOBSLIB_METRICS_INFLUXDB_DBNAME') if database: return database return self._settings['database'] def __init__(self, context, options): super().__init__(context, options) self._influxdb = InfluxDBClient( host=self.options.host, port=self.options.port, username=self.options.username, password=self.options.password, database=self.options.database, ) def push(self, metrics): @retrying.retry( stop_max_attempt_number=self.options.retry_max_attempts, wait_exponential_multiplier=self.options.retry_wait_multiplier) def _write_points(points): self._influxdb.write_points(points) current_dt = datetime.datetime.utcfromtimestamp(time.time()) ts = current_dt.strftime('%Y-%m-%dT%H:%M:%SZ') task_name = self.context.config.task_class.name try: points = [] for metric_name, metric_value in metrics.items(): tags = { 'task': task_name, } for k, v in metric_value.get('tags', {}).items(): if k in tags: raise Exception("Tag '{}' is reserved".format(k)) tags[k] = v metric = { 'measurement': metric_name, 'tags': tags, 'time': ts, 'fields': { 'value': float(metric_value['value']), }, } points.append(metric) _write_points(points) except Exception: logger.exception('Push monitoring metrics into InfluxDb failed')