Доброго времени суток. В процессе обслуживания большого количества серверов возникает необходимость централизации управления всем зоопарком машин, а так же централизованный сбор логов и аналитика логов на предмет выявления аномалий, ошибок и общей статистики.
В качестве централизованного сбора логов используется rsyslog, а для структурирования и визуализации elasticsearch + kibana. Все бы ничего, но когда количество подключенных машин разрастается, то данных настолько много, что уходит (уходило) большое количество времени на их обработку и анализ. Наряду с другими интересными штуками всегда хотелось организовать свой центр безопасности. Этакая мультимониторная статистика с картами, графиками и прочим.
В данной статье хочу описать свой опыт создания монитора статистики по атакам на ссш. Не будем рассматривать в этом плане защиту, потому как программисты и прочие могут не уметь подключаться на нестандартные порты или пользоваться сертификатами.
Так как у нас уже есть развернутый elastic с kibana, то будем на его базе и городить нашу систему.
Итак, у нас уже есть установленный докер и docker-compose, значит будем поднимать сервисы на нем.
elastic:
elasticsearch:
build: elasticsearch:2.3.4
container_name: elastic
command: elasticsearch -Des.network.host=0.0.0.0
net: host
ports:
- "9200:9200"
- "9300:9300"
volumes:
- "/srv/docker/elastic/etc:/usr/share/elasticsearch/config"
- "/srv/docker/elastic/db:/usr/share/elasticsearch/data"
- "/etc/localtime:/etc/localtime:ro"
restart: always
environment:
- ES_HEAP_SIZE=2g
/srv/docker/elastic/elasticsearch.yml:
cluster.name: Prod
node.name: "central-syslog"
http.port: 9200
network.host: _non_loopback_
discovery.zen.ping.multicast.enabled: false
discovery.zen.ping.unicast.hosts: [
]
transport.publish_host: 0.0.0.0
#transport.publish_port: 9300
http.cors.enabled : true
http.cors.allow-origin : "*"
http.cors.allow-methods : OPTIONS, HEAD, GET, POST, PUT, DELETE
http.cors.allow-headers : X-Requested-With,X-Auth-Token,Content-Type, Content-Length
script.engine.groovy.inline.aggs: on
/srv/docker/elastic/logging.yml:
logger:
action: DEBUG
com.amazonaws: WARN
appender:
console:
type: console
layout:
type: consolePattern
conversionPattern: "[%d{ISO8601}][%-5p][%-25c] %m%n"
kibana:
kibana:
image: kibana
restart: always
container_name: kibana
environment:
SERVICE_NAME: 'kibana'
ELASTICSEARCH_URL: "http://x.x.x.x:9200"
ports:
- "4009:5601"
volumes:
- "/etc/localtime:/etc/localtime:ro"
logstash:
logstash:
image: logstash:latest
restart: always
container_name: logstash
hostname: logstash
ports:
- "1025:1025"
- "1026:1026"
volumes:
- "/srv/docker/logstash/logstash.conf:/etc/logstash.conf:ro"
- "/srv/docker/logstash/ssh-map.json:/etc/ssh-map.json:ro"
command: "logstash -f /etc/logstash.conf"
Итак. Мы запустили эластик и кибану, но осталось подготовить логстеш к обработке логов от внешних серверов. Я реализовал вот такую схему:
rsyslog → logstash → elastic → kibana.
Можно было бы использовать встроенный в rsyslog коннектор напрямую в эластик, но нам нужны данные по полям и с geoip для статистики.
На серверах, подключаемых к мониторингу вносим в конфиг rsyslog (обычно это /etc/rsyslog.d/50-default.conf) вот такую запись:
auth,authpriv.* @@x.x.x.x:1026
Этой записью мы отправляем все события об авторизации на наш удаленный сервер (logstash).
Далее, полученные логстешем логи нам нужно обработать и оформить. Для этого создаем маппинг полей, чтобы на выходе нам было удобно работать (/srv/docker/logstash/ssh-map.json):
{
"template": "logstash-*",
"mappings": {
"ssh": {
"properties": {
"@timestamp": {
"type": "date",
"format": "strict_date_optional_time||epoch_millis"
},
"@version": {
"type": "string"
},
"username": {
"type": "string"
},
"src_ip": {
"type": "string"
},
"port": {
"type": "long"
},
}
}
}
}
В ходе создания маппинга столкнулся с одним багом логстеша, а именно присвоению полю значения geo_point (при создании своего индекса выставляется значение у geoip.location — float), по которому в дальнейшем будет строиться heatmap на карте. Баг этот зарегистрирован и в качестве workaround мне пришлось использовать шаблон стандартных индексов logstash-*.
Итак, маппинг у нас есть. Теперь нужно подготовить конфиг logstash, чтобы он фильтровал входящие данные и в нужном формате отдавал в эластик (/srv/docker/logstash/logstash.conf):
input {
tcp {
port => 1026
type => "security"
}
}
filter {
grok {
match => ["message", "Failed password for (invalid user |)%{USERNAME:username} from %{IP:src_ip} port %{BASE10NUM:port} ssh2"]
add_tag => "ssh_brute_force_attack"
}
grok {
match => ["message", "Accepted password for %{USERNAME:username} from %{IP:src_ip} port %{BASE10NUM:port} ssh2"]
add_tag => "ssh_sucessful_login"
}
geoip {
source => "src_ip"
}
}
output {
if "ssh_brute_force_attack" in [tags] {
elasticsearch {
hosts => ["x.x.x.x:9200"]
index => "logstash-%{+YYYY.MM.dd}"
manage_template => true
template_name => "ssh"
template => "/etc/ssh-map.json"
template_overwrite => true
}
}
}
Конфиг удобочитаемый, понятный, поэтому комментировать считаю излишним.
Итак. Логи ссш попадают в логстеш, он их обрабатывает и отправляет в индексы эластика. Осталось только настроить визуализацию:
— Открываем веб интерфейс по адресу x.x.x.x:4009/
— Переходим в Settings и добавляем работу с нашими индексами (logstash-*)
Далее нам нужно в kibana создать поисковые запросы, визуализацию и дашборд.
Во вкладке Discover после добавления индексов в kibana мы видим наши записи — все настроили верно.
В левой колонке мы видим список полей для фильтрации, с ними и будем работать.
Первым фильтром пойдет список атакуемых серверов:
— около поля host нажимаем add
— сохраняем поиск как ssh-brute-servers-under-attack (имя вариативно)
Вторым фильтром будет список атакующих стран:
— около поля geoip.country_name нажимаем add
— сохраняем как ssh-brute-countries (имя вариативно)
Третьим фильтром будет общее количество атак:
— переходим на вкладку Discovery
— сохраняем как ssh-brute-all
Итак, на финальном экране у нас будет отображаться четыре разных параметра:
1. Суммарное количество атак
2. Атакующие страны
3. Атакуемые серверы
4. Карта с указателями на атакующие хосты
Суммарное количество атак:
— переходим на вкладку Visualize
— выбираем тип визуализации Metric
— From saved search — ssh-brute-all
— Открываем Metric и меняем значение поля на — Суммарное количество атак
— Сохраняем визуализацию
Атакующие страны:
— переходим на вкладку Visualize
— выбираем тип визуализации Data table
— From saved search — ssh-brute-countries
— Открываем Metric и меняем значение поля на — Количество атак
— Теперь нам нужно соотнести поля и посчитать в таблице «уники». Нажимаем split rows
— Aggregation — terms
— Field — geoip.country_name.raw
— Custom label — Страна
Если все ввели верно, то загорится зеленая кнопка play, после нажатия на которую увидим примерно такую картину:
— Сохраняем визуализацию
Атакуемые серверы:
— переходим на вкладку Visualize
— выбираем тип визуализации Data table
— From saved search — ssh-brute-servers-under-attack
— Открываем Metric и меняем значение поля на — Количество атак
— Теперь нам нужно соотнести поля и посчитать в таблице «уники». Нажимаем split rows
— Aggregation — terms
— Field — host.raw
— Custom label — Сервер
Если все ввели верно, то загорится зеленая кнопка play, после нажатия на которую увидим примерно такую картину:
— Сохраняем визуализацию
Карта с указателями на атакующие хосты (самое интересное)
— переходим на вкладку Visualize
— выбираем тип визуализации Tile map
— From new search — Select an index pattern — logstash-*
— Открываем Geo Coordinates. Если все шаги были верными, автоматически поле Field заполнится geoip.location
— Переходим в Options
— Меняем
— Приводим все поля к параметрам:
WMS Url — basemap.nationalmap.gov/arcgis/services/USGSTopo/MapServer/WMSServer
WMS layers* — 0
WMS version* — 1.3.0
WMS format* — image/png
WMS attribution — Maps provided by USGS
WMS styles* — пусто
В итоге у нас должна отобразиться карта атак:
— Сохраняем визуализацию
Теперь у нас всё есть для того, чтобы сделать свой дашборд.
Переходим на вкладку Dashboard, нажимаем на Add visualization (плюс в круге справа сверху) и добавляем свои сохраненные визуализации на экран и сохраняем экран. Поддерживается Drag'n'Drop. В итоге у меня получился вот такой экран:
Теперь для подключения новых хостов к системе достаточно будет указывать у них передачу логов авторизации на сервер logstash и при возникновении событий хосты и информация будут добавляться на экран и в эластик.
При желании вы всегда сможете добавить логи брутфорса контрольных панелей, собрать более детализированную статистику и добавить на экран по аналогии с этой статьей.
Автор: nikolayvaganov