В статье описана настройка центрального логирования для разных типов приложений (Python, Java (java.util.logging), Go, bash) с помощью довольно нового проекта Heka.
Heka разрабатывается в Mozilla и написана на Go. Именно поэтому я использую его вместо logstash, который имеет сходные возможности.
Heka основан на плагинах, которые имеют пять типов:
- Входные — каким-то образом принимают данные (слушает порты, читают файлы и др.);
- Декодеры — обрабатывают входящие запросы и переводят их во внутренние структуры для сообщений;
- Фильтры — производят какие либо действия с сообщениями;
- Encoders (неясно, как переводить) — кодируют внутренние сообщения в форматы, которые отправляются через выходные плагины;
- Выходные — отправляют куда-либо данные.
Например, в случае Java-приложений входным плагином является LogstreamerInput, который смотрит за изменениями в файле логов. Новые строки в логе обрабатываются декодером PayloadRegexDecoder (по заданному формату) и дальше отправляются в elasticsearch через выходной плагин ElasticSearchOutput. Выходной плагин в свою очередь кодирует сообщение из внутренней структуры в формат elasticsearch через ESJsonEncoder.
Установка Heka
Все способы установки описаны на сайте (http://hekad.readthedocs.org/en/v0.8.2/installing.html#binaries). Но проще всего скачать готовый бинарный пакет под свою систему со страницы https://github.com/mozilla-services/heka/releases.
Поскольку у меня используются сервера под ubuntu, то и описание будет для этой системы. В данном случае установка сводится к установке самого deb пакета, настройке файла конфигурации /etc/hekad.toml и добавления в сервисы upstart.
В базовую настройку /etc/hekad.toml у меня входит настройка количества процессов (я ставлю равным количеству ядер), dashboard (в котором можно посмотреть какие плагины включены) и udp сервер на 5565 порту, который ожидает сообщения по протоколу google protobuf (используется для python и go приложений):
maxprocs = 4
[Dashboard]
type = "DashboardOutput"
address = ":4352"
ticker_interval = 15
[UdpInput]
address = "127.0.0.1:5565"
parser_type = "message.proto"
decoder = "ProtobufDecoder"
Конфигурация для upstart /etc/init/hekad.conf:
start on runlevel [2345]
respawn
exec /usr/bin/hekad -config=/etc/hekad.toml
Логирование Python приложений
Здесь используется библиотека https://github.com/kalail/heka-py и специальный хендлер для модуля logging. Код:
import logging
from traceback import format_exception
try:
import heka
HEKA_LEVELS = {
logging.CRITICAL: heka.severity.CRITICAL,
logging.ERROR: heka.severity.ERROR,
logging.WARNING: heka.severity.WARNING,
logging.INFO: heka.severity.INFORMATIONAL,
logging.DEBUG: heka.severity.DEBUG,
logging.NOTSET: heka.severity.NOTICE,
}
except ImportError:
heka = None
class HekaHandler(logging.Handler):
_notified = None
conn = None
host = '127.0.0.1:5565'
def __init__(self, name, host=None):
if host is not None:
self.host = host
self.name = name
super(HekaHandler, self).__init__()
def emit(self, record):
if heka is None:
return
fields = {
'Message': record.getMessage(),
'LineNo': record.lineno,
'Filename': record.filename,
'Logger': record.name,
'Pid': record.process,
'Exception': '',
'Traceback': '',
}
if record.exc_info:
trace = format_exception(*record.exc_info)
fields['Exception'] = trace[-1].strip()
fields['Traceback'] = ''.join(trace[:-1]).strip()
msg = heka.Message(
type=self.name,
severity=HEKA_LEVELS[record.levelno],
fields=fields,
)
try:
if self.conn is None:
self.__class__.conn = heka.HekaConnection(self.host)
self.conn.send_message(msg)
except:
if self.__class__._notified is None:
print "Sending HEKA message failed"
self.__class__._notified = True
По умолчанию хендлер ожидает, что Heka слушает на порту 5565.
Логирование Go приложений
Для логирования я форкнул библиотеку для логирования https://github.com/ivpusic/golog и добавил туда возможность отправки сообщений прямо в Heka. Результат расположен здесь: https://github.com/ildus/golog
Использование:
import "github.com/ildus/golog"
import "github.com/ildus/golog/appenders"
...
logger := golog.Default
logger.Enable(appenders.Heka(golog.Conf{
"addr": "127.0.0.1",
"proto": "udp",
"env_version": "2",
"message_type": "myserver.log",
}))
...
logger.Debug("some message")
Логирование Java приложений
Для Java приложений используется входной плагин типа LogstreamerInput с специальным regexp декодером. Он читает логи приложения из файлов, которые должны быть записаны в определенном формате.
Конфигурация для heka, отвечающая за чтение и декодирование логов:
[JLogs]
type = "LogstreamerInput"
log_directory = "/some/path/to/logs"
file_match = 'app_(?P<Seq>d+.d+).log'
decoder = "JDecoder"
priority = ["Seq"]
[JDecoder]
type = "PayloadRegexDecoder"
#Parses com.asdf[INFO|main|2014-01-01 3:08:06]: Server started
match_regex = '^(?P<LoggerName>[w.]+)[(?P<Severity>[A-Z]+)|(?P<Thread>[wd-]+)|(?P<Timestamp>[d-s:]+)]: (?P<Message>.*)'
timestamp_layout = "2006-01-02 15:04:05"
timestamp_location = "Europe/Moscow"
[JDecoder.severity_map]
SEVERE = 3
WARNING = 4
INFO = 6
CONFIG = 6
FINE = 6
FINER = 7
FINEST = 7
[JDecoder.message_fields]
Type = "myserver.log"
Message = "%Message%"
Logger = "%LoggerName%"
Thread = "%Thread%"
Payload = ""
В приложении надо изменить Formatter через logging.properties. Пример logging.properties:
handlers= java.util.logging.FileHandler java.util.logging.ConsoleHandler
java.util.logging.FileHandler.level=ALL
java.util.logging.FileHandler.pattern = logs/app_%g.%u.log
java.util.logging.FileHandler.limit = 1024000
java.util.logging.FileHandler.formatter = com.asdf.BriefLogFormatter
java.util.logging.FileHandler.append=tru
java.util.logging.ConsoleHandler.level=ALL
java.util.logging.ConsoleHandler.formatter=com.asdf.BriefLogFormatter
Код BriefLogFormatter:
package com.asdf;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.logging.Formatter;
import java.util.logging.LogRecord;
public class BriefLogFormatter extends Formatter {
private static final DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static final String lineSep = System.getProperty("line.separator");
/**
* A Custom format implementation that is designed for brevity.
*/
public String format(LogRecord record) {
String loggerName = record.getLoggerName();
if(loggerName == null) {
loggerName = "root";
}
StringBuilder output = new StringBuilder()
.append(loggerName)
.append("[")
.append(record.getLevel()).append('|')
.append(Thread.currentThread().getName()).append('|')
.append(format.format(new Date(record.getMillis())))
.append("]: ")
.append(record.getMessage()).append(' ')
.append(lineSep);
return output.toString();
}
}
Логирование скриптов (bash)
Для bash в heka добавляется входной фильтр TcpInput (который слушает на определенном порту) и PayloadRegexDecoder для декодирования сообщений. Конфигурация в hekad.toml:
[TcpInput]
address = "127.0.0.1:5566"
parser_type = "regexp"
decoder = "TcpPayloadDecoder"
[TcpPayloadDecoder]
type = "PayloadRegexDecoder"
#Parses space_checker[INFO|2014-01-01 3:08:06]: Need more space on disk /dev/sda6
match_regex = '^(?P<LoggerName>[w.-]+)[(?P<Hostname>[^|]+)|(?P<Severity>[A-Z]+)|(?P<Timestamp>[d-s:]+)]: (?P<Message>.*)'
timestamp_layout = "2006-01-02 15:04:05"
timestamp_location = "Europe/Moscow"
[TcpPayloadDecoder.severity_map]
ERROR = 3
WARNING = 4
INFO = 6
ALERT = 1
[TcpPayloadDecoder.message_fields]
Type = "scripts"
Message = "%Message%"
Logger = "%LoggerName%"
Hostname = "%Hostname%"
Payload = "[%Hostname%|%LoggerName%] %Message%"
Для логирования написана функция, которая отправляет сообщения на TCP порт в заданном формате:
log()
{
if [ "$1" ]; then
echo -e "app1[`hostname`|INFO|`date '+%Y-%m-%d %H:%M:%S'`]: $1" | nc 127.0.0.1 5566 || true
echo $1
fi
}
Отправка сообщения с уровнем INFO с типом app1:
log "test test test"
Отправка записей в elasticsearch
Добавляется следующая конфигурация в hekad.conf:
[ESJsonEncoder]
index = "heka-%{Type}-%{2006.01.02}"
es_index_from_timestamp = true
type_name = "%{Type}"
[ElasticSearchOutput]
message_matcher = "Type == 'myserver.log' || Type=='scripts' || Type=='nginx.access' || Type=='nginx.error'"
server = "http://<elasticsearch_ip>:9200"
flush_interval = 5000
flush_count = 10
encoder = "ESJsonEncoder"
Здесь мы указываем, где находится elasticsearch, как должны формироваться индексы и какие типы сообщений туда отправлять.
Просмотр логов
Для просмотра логов используется Kibana 4. Она все еще в бете, но уже вполне рабочая. Для установки надо скачать архив со страницы http://www.elasticsearch.org/overview/kibana/installation/, распаковать в какую либо папку на сервере и указать расположение elasticsearch сервера в файле config/kibana.yml (параметр elasticsearch_url).
При первом запуске понадобится добавить индексы во вкладке Settings. Чтобы можно было добавить шаблон индекса и правильно определились поля, необходимо отправить тестовое сообщение из каждого приложения. Потом можно добавить шаблон индекса вида heka-*(которое покажет все сообщения) или heka-scripts-*, тем самым отделив приложения друг от друга. Дальше можно перейти вкладку Discover и посмотреть сами записи.
Заключение
Я показал только часть того, что можно логировать с помощью Heka.
Если кто-либо заинтересовался, то могу показать часть Ansible конфигурации, которая автоматически устанавливает heka на все серверы, а на выбранные elasticsearch с kibana.
Автор: ildus