#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Tediga provides logging capabilities to applications to ensure that the
data is logged in a consistent manner, which is a prerequisite for later
evaluation.
Tedega uses Fluentd Logs for centralized logging. This collects all logs
into a uniform form, and stores them in different backends as needed.
From there, the logs can then be analyzed using tools such as
Elasticsearch or Hadop.
The following categories are logged:
* **Requests**. All requests to the application are logged. The
Includes the url, method (GET, POST, PUT ...) and possible parameters. They
are marked in the category *REQUEST*.
* **Processing time**. Every request to a service will have the
response time logged in milliseconds that a service needs to answer a
request. The time adds up to the necessary steps for the answering be
carried out. So also possible queries to others services. The category
for the processing time is *PROCTIME*
* **Status response**. Each request logs the HTTP status of it
answer. The category for the status is *RETURNCODE*
* **Reachability**. Periodically, each service is will check its
connecivity to other hosts and services. Accessibility messages have the
category *PING*.
* **Utilization RAM, CPU, memory**. We pick up at regular intervals
information about the load of RAM, load and
storage space in order to detect bottlenecks early. The
corresponding category is *SYSTEM*.
* **More information**. In addition to the information described above,
of course also any other information can be logged as needed. These
should then be categorized with *CUSTOM*.
So that the messages are systematically evaluated in a central location
all messages must be in a predefined format. Each
Log message has a tag which Fluentd uses to route log messages
can be used::
<HOST>. <SERVICE> [. <CONTAINER>]
The actual log message is saved in JSON format::
{
"type": "INFO",
"category": null,
"correlation_id": null,
"extra_key_depending_on_category": "value"
}
The following table provides information about the most important tags
in log messages.
============== ================
Section Description
============== ================
HOST Name of the computer.
CONTAINER Name Containers.
SERVICE Name of the service
CATEGORY Type of log message.
CORRELATION_ID When a request first encounters a service, it generates a unique UUID, which is used in all subsequent queries to track related messages across different services. This information is optional because not all messages are generated in services or require such a UUID.
LEVEL Indicates whether the message is an ERROR, a WARNING, an INFO, or a DEBUG output. The default for a message is INFO.
============== ================
"""
import threading
import collections
import socket
import time
import os
import logging
import voorhees
import psutil
from fluent import handler
custom_format = {
'host': '%(hostname)s',
# 'where': '%(module)s.%(funcName)s',
# 'type': '%(levelname)s',
# 'stack_trace': '%(exc_text)s'
}
CATEGORIES = ["PING", "SYSTEM", "PROCTIME",
"RETURNCODE", "REQUEST", "AUTH", "CUSTOM"]
log = None
[Doku]def log_proctime(func):
"""Decorator to log the processing time of the decorated method."""
def wrap(*args, **kwargs):
started_at = time.time()
result = func(*args, **kwargs)
log.info({"time": time.time() - started_at, "func": "%s.%s" % (func.__module__, func.__name__)}, "PROCTIME")
return result
return wrap
def monitor_system(interval=300, duration=10):
"""Continually logging of CPU, RAM and DISK usage in the
given intervall in seconds. The CPU will be the averange for the
given duration.
:interval: Check will be executed every X seconds
:duration: CPU is the averange of the given duration
"""
def worker():
while 1:
_log_system(duration)
time.sleep(interval)
t = threading.Thread(name="monitor_system", target=worker, daemon=True)
t.start()
def _log_system(interval):
cpu = psutil.cpu_percent(interval=interval, percpu=False)
memory = psutil.virtual_memory().percent
disk = psutil.disk_usage('/').percent
log.info({"cpu": cpu, "memory": memory, "disk": disk}, "SYSTEM")
[Doku]def monitor_connectivity(hosts, interval=60):
"""Continually check and log the connection to the list of given
hosts in the given intervall in seconds.
:hosts: List of tuples (hostname, port)
:interval: Check will be executed every X seconds
"""
def worker(hosts):
while 1:
_log_connectivity(hosts)
time.sleep(interval)
t = threading.Thread(name="monitor_connectivity", target=worker, args=(hosts,), daemon=True)
t.start()
def _log_connectivity(hosts):
result = collections.OrderedDict()
for host, port in hosts:
try:
socket.create_connection((host, port))
result[host] = True
except OSError:
result[host] = False
log.info(result, "PING")
[Doku]class Logger(object):
"""Wrapper around the Python logger to ensure a specific log
format."""
def __init__(self, logger, service):
self._logger = logger
self._service = service
def _build_message(self, message, category, correlation_id):
if category is not None and category not in CATEGORIES:
raise ValueError("{} logging category unknown.".format(category))
msg = collections.OrderedDict()
msg["service"] = self._service
msg["category"] = category
msg["correlation_id"] = correlation_id
if isinstance(message, dict):
msg.update(message)
else:
msg["message"] = message
return voorhees.to_json(msg)
[Doku] def debug(self, message, category=None, correlation_id=None):
"""Write a debug message."""
message = self._build_message(message, category, correlation_id)
self._logger.debug(message)
[Doku] def info(self, message, category=None, correlation_id=None):
"""Write a info message."""
message = self._build_message(message, category, correlation_id)
self._logger.info(message)
[Doku] def error(self, message, category=None, correlation_id=None):
"""Write a error message."""
message = self._build_message(message, category, correlation_id)
self._logger.error(message)
[Doku] def warning(self, message, category=None, correlation_id=None):
"""Write a warning message."""
message = self._build_message(message, category, correlation_id)
self._logger.warning(message)
def build_tag(service):
"""Will build a tag used to tag fluentd log messages. The tag has
the format <hostname>.<service>[.<container>]'
The `hostname` is determined from the DOCKER_HOSTNAME
environment variable. If the variable is not set (e.g the service is
not running in a docker container) the hostname is set to the
systems hostname.
The `container` is determined by the systems hostname which is the
docker container ID in case the service runs in a docker container.
Otherwise the container is equal to the `hostname` and will be
omitted in the tag.
:service: Name of the microservice
:returns: Logging tag
"""
tag = []
hostname = os.environ.get("DOCKER_HOSTNAME", socket.gethostname())
tag.append(hostname)
tag.append(service)
container = socket.gethostname()
if hostname != container:
tag.append(container)
return ".".join(tag)
[Doku]def init_logger(service, host="fluentd", port=24224):
"""Will initialise a global :class:`Logger` instance to log to fluentd.
:tag: String used as tag for fluentd log routing.
:host: Host where the fluentd is listening
:port: Port where the fluentd is listening
"""
tag = build_tag(service)
logging.basicConfig(level=logging.INFO)
l = logging.getLogger(tag)
h = handler.FluentHandler(tag, host=host, port=port)
formatter = handler.FluentRecordFormatter(custom_format)
h.setFormatter(formatter)
l.addHandler(h)
global log
log = Logger(l, service)
[Doku]def get_logger():
"""Will return the global :class:`Logger` instance. Will raise an
exception if the Logger is not initialised.
:returns: Logger instance
"""
global log
if log:
return log
raise ValueError("Logger is not initialised!")