指南

https://elkguide.elasticsearch.cn/logstash/get-start/install.html

架构

Elasticsearch 实时全文搜索和分析引擎
Logstash 日志收集,分析,过滤
Kibana 数据图形化展示

Server(producer) Beats -> Zookeeper Kafka topic -> (按照业务功能拆分ELK cluster) Logstash (consumer) -> ES -> Kibana (日志敏感信息泄露)

服务器
/etc/hosts

1
2
3
4
5
6
30.3.229.120 develk01
30.3.229.121 develk02
30.3.229.122 develk03
30.3.229.123 devkafka01
30.3.229.124 devkafka02
30.3.229.125 devkafka03

添加root用户
useradd -u 0 -o -g root -G root -d /root/ user1
echo “user1”:”passw0rD” | chpasswd

安装

升级java至1.8

卸载低版本java
rpm -qa | grep jdk
yum -y remove jdk-1.7.0_79-fcs.x86_64
yum -y list java*
yum -y install java-1.8.0-openjdk.x86_64
rpm -i jdk-8u171-linux-x64.rpm
rpm -qa | grep logstash
rpm -e –nodeps logstash-5.6.10-1.noarch

版本选择

Beats
Elasticsearch
Elasticsearch Hadoop
Kibana
Logstash
X-Pack

Elasticsearch

/etc/elasticsearch/elasticsearch.yml # els的配置文件
/etc/elasticsearch/jvm.options # JVM相关的配置,内存大小等等
/etc/elasticsearch/log4j2.properties # 日志系统定义
/var/lib/elasticsearch # 数据的默认存放位置

集群配置

/etc/elasticsearch/elasticsearch.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
cluster.name: elk-cluster
node.name: ${HOSTNAME}
#node.master: true
#node.data: true
network.host: 0.0.0.0
http.port: 9200
path.data: /data/els/data
path.logs: /data/els/logs
bootstrap.memory_lock: false
bootstrap.system_call_filter: false
network.host: 0.0.0.0
http.port: 9200
discovery.zen.ping.unicast.hosts: ["devekl01", "develk02","develk03"]
discovery.zen.minimum_master_nodes: 2

/etc/security/limits.conf

1
2
* soft nofile 65536
* hard nofile 65536

启动报错

Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x0000000085330000, 2060255232, 0) failed; error=’Cannot allocate memory’ (errno=12)

/etc/elasticsearch/jvm.options

-Xms512m
-Xmx512m

[2]: max number of threads [1832] for user [elasticsearch] is too low, increase to at least [2048]

ulimit -u 2048

创建数据目录

1
2
3
4
5
6
rm -rf /data/
mkdir -p /data/els/{logs,data}
chown -R elasticsearch.elasticsearch /data/*
service elasticsearch start`
or
bin/elasticsearch -d

查看节点

http://30.3.229.120:9200/_cat/nodes?pretty

常用操作

查看索引
curl http://localhost:9200/_cat/indices?v

删除索引
curl -u elastic:changeme -XDELETE http://localhost:9200/my_index

/_cat/health?v
/_cat/nodes?v

安装x-pack
bin/elasticsearch-plugin install x-pack

修改x-pack默认密码
curl -XPUT -u elastic 'localhost:9200/_xpack/security/user/elastic/_password' -d '{"password" : "dfh*&(dUJ"}'

elasticsearch-head 监控

yum install nodejs
yum install npm
npm install -g grunt-cli
git config –global https.proxy http://127.0.0.1:1080
git clone git://github.com/mobz/elasticsearch-head.git
npm config set strict-ssl false
npm config set registry https://registry.npm.taobao.org
npm config set proxy http://127.0.0.1:1080
npm info express
npm install
grunt server

后台运行
nohup grunt server &

http://localhost:9100

Logstash

1
2
3
4
5
6
7
8
[logstash-5.x]
name=Elastic repository for 5.x packages
baseurl=https://artifacts.elastic.co/packages/5.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

/etc/logstash/conf.d/elk.conf

input –> filter –> output

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
input {
syslog {
syslog_field => "syslog" # default message
port => 514
}
beats {
port => 5044
}
}
filter {
}
output {
elasticsearch {
hosts => ["192.168.200.109:9200"]
index => "test-%{+YYYY.MM}"
}
}

启动logstash
nohup bin/logstash --debug --path.settings /etc/logstash/ -f config/test.conf > ls.log 2>&1 &
多实例
bin/logstash -f config/syslog.conf –path.data=/tmp

https://www.elastic.co/guide/en/logstash/5.6/config-examples.html

写入kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
input {
stdin{}
}
output {
kafka {
topic_id => "test"
codec => plain {
format => "%{message}"
charset => "UTF-8"
}
bootstrap_servers => "192.168.6.22:9092"
}
stdout{
codec => rubydebug
}
}

读取kafka
区别于低版本,5.0后的版本连接kafka实例地址,而非zk。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
input{
kafka{
bootstrap_servers => ["192.168.6.22:9092"]
#client_id => "test"
group_id => "test"
auto_offset_reset => "latest"
consumer_threads => 5
decorate_events => true
topics => ["test"]
}
}
output{
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "test-%{+YYYY.MM}"
user => 'elastic'
password => 'changeme'
}
stdout{
codec => rubydebug
}
}

多进程

bin/logstash -f config/rsyslog.conf –path.data=/tmp

安装x-pack监控
logstash-plugin install file:///tmp/x-pack-5.6.10.zip

nano config/logstash.yml

1
2
3
4
5
xpack.monitoring.elasticsearch.url: ["http://10.2.0.27:9200"]
xpack.monitoring.elasticsearch.username: "elastic"
xpack.monitoring.elasticsearch.password: "dfh*&(dUJ"
xpack.monitoring.enabled: true
xpack.monitoring.collection.interval: 10s

Kafka

Topic
Kafka将消息种子(Feed)分门别类 每一类的消息称之为话题(Topic).
Producer
发布消息的对象称之为话题生产者(Kafka topic producer)
Consumer
订阅消息并处理发布的消息的种子的对象称之为话题消费者(consumers)
Broker
已发布的消息保存在一组服务器中称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个话题并从Broker拉数据从而消费这些已发布的消息。

由于broker采用了主题topic–>分区的思想,使得某个分区内部的顺序可以保证有序性,但是分区间的数据不保证有序性。这样,消费者可以以分区为单位,自定义读取的位置——offset。

兼容性
如果使用logstash 5.x 则相应的kafka版本选择0.10.0.x

1
2
3
4
5
6
7
# |==========================================================
# |Kafka Client Version |Logstash Version |Plugin Version |Why?
# |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular
# |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (`event['product']['price'] = 10`)
# |0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (`event.set('[product][price]', 10)`)
# |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker
# |==========================================================

wget http://mirrors.hust.edu.cn/apache/kafka/0.11.0.2/kafka-0.11.0.2-src.tgz

配置 server.properties

1
2
3
broker.id=0
listeners=PLAINTEXT://kafka01:9092
advertised.listeners=PLAINTEXT://kafka01:9092

修改启动脚本jvm内存使用大小
export KAFKA_HEAP_OPTS="-Xmx512M -Xms256M"

启动服务
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zk.out 2>&1 &
nohup bin/kafka-server-start.sh config/server.properties > kafka.out 2>&1 &

bin/kafka-server-start.sh -daemon config/server.properties

自动创建topoic
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

查看已创建的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181

查看test topic详情
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

删除topic

1
2
3
4
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test # 标记删除
bin/zookeeper-shell.sh localhost:2181 # zk 删除
ls /brokers/topics
rmr /brokers/topics/test

或者修改server.properties文件参数delete.topic.enable=true

测试

生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

生产者消费者机器必须写kafka主机名hosts

集群

zookeeper.properties

1
2
3
4
5
6
7
8
initLimit=10
syncLimit=5
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
server.0=kafka:2888:3888
server.1=kafka01:2889:3889
server.2=kafka02:2890:3890

在dataDir目录设置各自的id
mkdir -p /tmp/zookeeper/log && echo [server.id] > /tmp/zookeeper/myid

server.properties

1
2
3
4
broker.id=0 # 配置不同的id
listeners=PLAINTEXT://kafka:9092
advertised.listeners=PLAINTEXT://kafka:9092
zookeeper.connect=kafka:2181,kafka01:2181,kafka02:2181

测试

创建topic并向任意broker写入消息,从任意broker读取消息。

创建Topic
bin/kafka-topics.sh --create --zookeeper devkafka01:2181,devkafka02:2181,devkafka03:2181 --replication-factor 3 --partition 3 --topic mytopic

列出topic
bin/kafka-topics.sh --list --zookeeper localhost:2181

查看Topic
bin/kafka-topics.sh --describe --zookeeper devkafka01:2181 --topic mytopic

创建生产者
bin/kafka-console-producer.sh --broker-list devkafka01:9092,devkafka02:9092,devkafka03:9092 --topic mytopic

创建消费者
bin/kafka-console-consumer.sh --zookeeper devkafka01:2181,devkafka02:2181,devkafka03:2181 --from-beginning --topic mytopic

监控

1
2
3
4
5
6
7
8
#!/bin/bash
java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--offsetStorage kafka \
--zk devkafka01,devkafka02,devkafka03 \
--port 8080 \
--refresh 10.seconds \
--retain 1.days

nohup ./kom.sh > /dev/null 2>&1 &

压力测试

Kibana

/etc/kibana/kibana.yml

1
2
3
4
5
server.port: 5601 # kibana 监听端口
server.host: 0.0.0.0 # 监听ip
elasticsearch.url: "http://127.0.0.1:9200" # es主节点
elasticsearch.username: "elastic"
elasticsearch.password: "changeme"

离线安装x-pack

bin/kibana-plugin install file:///tmp/x-pack-5.6.10.zip

service kibana start
or
nohup bin/kibana &

图表

告警类型饼图
攻击源地址柱状图
访问的url信息

Grafana

Beats

Winlogbeat 5.6.10

配置
winlogbeat.yml

1
2
3
4
5
6
7
8
9
10
#----------------------------- Logstash output --------------------------------
output.logstash:
# The Logstash hosts
hosts: ["192.168.200.109:5044"]
#--------------------------kafka-----------------------------------
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["192.168.6.22:9092"]
topic: "test"

Logstash

1
2
3
4
5
input {
beats {
port => 5044
}
}

检查配置
.\winlogbeat.exe -c .\winlogbeat.yml -configtest -e

安装
PS C:\winlogbeat-5.6.10-windows-x86_64> .\install-service-winlogbeat.ps1
启动服务
net start winlogbeat

Flume

X-Pack

默认密码
username: elastic
password: changeme

破解

LicenseVerifier.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package org.elasticsearch.license;
import java.nio.*;
import java.util.*;
import java.security.*;
import org.elasticsearch.common.xcontent.*;
import org.apache.lucene.util.*;
import org.elasticsearch.common.io.*;
import java.io.*;
public class LicenseVerifier
{
public static boolean verifyLicense(final License license, final byte[] encryptedPublicKeyData) {
return true;
}
public static boolean verifyLicense(final License license) {
return true;
}
}

javac -cp "/usr/share/elasticsearch/lib/elasticsearch-5.6.10.jar:/usr/share/elasticsearch/lib/lucene-core-6.6.1.jar:/usr/share/elasticsearch/plugins/x-pack/x-pack-5.6.10.jar" LicenseVerifier.java

注册新的license

1
{"license":{"uid":"d3cbbbee-9155-4e1a-a5ed-a7e8940d6564","type":"platinum","issue_date_in_millis":1499299200000,"expiry_date_in_millis":2524579200999,"max_nodes":1000,"issued_to":"guo dalu (eastmoney)","issuer":"Web Form","signature":"AAAAAwAAAA0C9L3AjL50eKgiW55YAAABmC9ZN0hjZDBGYnVyRXpCOW5Bb3FjZDAxOWpSbTVoMVZwUzRxVk1PSmkxaktJRVl5MUYvUWh3bHZVUTllbXNPbzBUemtnbWpBbmlWRmRZb25KNFlBR2x0TXc2K2p1Y1VtMG1UQU9TRGZVSGRwaEJGUjE3bXd3LzRqZ05iLzRteWFNekdxRGpIYlFwYkJiNUs0U1hTVlJKNVlXekMrSlVUdFIvV0FNeWdOYnlESDc3MWhlY3hSQmdKSjJ2ZTcvYlBFOHhPQlV3ZHdDQ0tHcG5uOElCaDJ4K1hob29xSG85N0kvTWV3THhlQk9NL01VMFRjNDZpZEVXeUtUMXIyMlIveFpJUkk2WUdveEZaME9XWitGUi9WNTZVQW1FMG1DenhZU0ZmeXlZakVEMjZFT2NvOWxpZGlqVmlHNC8rWVVUYzMwRGVySHpIdURzKzFiRDl4TmM1TUp2VTBOUlJZUlAyV0ZVL2kvVk10L0NsbXNFYVZwT3NSU082dFNNa2prQ0ZsclZ4NTltbU1CVE5lR09Bck93V2J1Y3c9PQAAAQB2gL4WXN64P0+c5q6TDyhqPllFvkboZMWjzJHid05qCtI86/I0aSsFgYF3AkVA1qoz7UHsjC/xBsoyhuXfmHn6LbsZYXweZ4LsllG8RJ8HH/bBYVTBt+Mag+wXE/QZUS7HnSA8iAReQ7tY//wyuEVrxFDeAI9cgwWN90RoZ3sAgkzGq0jVr2JoUYeYwNJ4GZ2GMDS7GsHBxNWBJVgfDkZXvLya/jOJhaKi2GvW8mIzFp19/FO+t2+ReUkbF3T35nVIZnqFDVhXtOz981By4ArffE8ythlI4X67Nabtzoy87V5gXanBvsSdHiHpYJMrYwn7DU+93Ie6t56Lesjkj//b","start_date_in_millis":1499299200000}}

curl -XPUT -u elastic:changeme 'http://30.3.229.120:9200/_xpack/license?acknowledge=true' -d @l.json
curl -XGET -u elastic:changeme 'http://30.3.229.120:9200/_license'

Watcher

监控含有alert的日志并发送邮件告警

配置elasticsearch.yml

1
2
3
4
5
6
7
8
9
10
11
12
xpack.notification.email.account:
exchange_account:
profile: outlook
email_defaults:
from: user@domain.com
smtp:
auth: true
starttls.enable: false
host: mail.domain.com
port: 587
user: user
password: pass

“event_type”: “alert”,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
{
"trigger": {
// 每间隔5m触发
"schedule": {
"interval": "5m"
}
},
"input": {
// 查询结果
"search": {
"request": {
"search_type": "query_then_fetch",
"indices": ["logstash-map-2018.07"],
"types": [],
"body": {
"size": 0,
"query": {
// bool 同时满足两个条件
"bool" : {
"must" : [
{ "match" : { "event_type": "alert" }},
{ "range" : { "@timestamp" : { "gte" : "now-1h" }}}
]
}
}
}
}
}
},
// 判断acction条件
"condition": {
"compare": {
"ctx.payload.hits.total": {
"gte": 10
}
}
},
// 执行告警方式
"actions": {
"my-logging-action": {
"logging": {
"level": "info",
"text": "Fine {{ctx.payload.hits.total}} alerts in last 5m."
}
}
}
}

聚合5m钟内登陆失败用户

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
GET logstash-dc01-security-2018.08/_search
{
"size": 1,
"query": {
"bool" : {
"must" : [{ "match" : { "event_id" : 4625 }}],
"filter":[{ "range" : { "@timestamp" : { "gte" : "now-1d" }}}]
}
},
"aggs": {
"group_by_TargetUserName": {
"terms": {
"field": "event_data.TargetUserName.keyword"
}
}
}
}
Result:
"aggregations": {
"group_by_TargetUserName": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"doc_count": 6,
"key": "test"
},
{
"doc_count": 2,
"key": "test1"
},
{
"doc_count": 1,
"key": "admin"
}
]
}
}

数组
ctx.payload.aggregations.group_by_TargetUserName.buckets

1
2
3
4
5
6
7
8
9
10
11
"condition": {
"array_compare": {
"ctx.payload.aggregations.group_by_TargetUserName.buckets" : {
"path": "doc_count" ,
"gte": {
"value": 25,
"quantifier": "some"
}
}
}
}

动作

发送邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
"send_email" : {
"throttle_period": "15m",
"email" : {
"to" : "<username>@<domainname>",
"cc": ["a@<domainname>","b@<domainname>"]
"subject" : "Watcher Notification",
"body" : "Top10 users:\n{{#ctx.payload.aggregations.topn.buckets}}\n{{key}} {{doc_count}}\n{{/ctx.payload.aggregations.topn.buckets}}",
"attachments" : {
"attached_data" : {
"data" : {
"format" : "json"
}
}
},
"priority" : "high"
}
}

webhook

Suricata

https://suricata-ids.org/

安装

1
2
3
4
5
6
7
8
9
sudo yum -y install gcc libpcap-devel pcre-devel libyaml-devel file-devel \
zlib-devel jansson-devel nss-devel libcap-ng-devel libnet-devel tar make \
libnetfilter_queue-devel lua-devel
wget https://www.openinfosecfoundation.org/download/suricata-4.0.4.tar.gz
tar -zxvf suricata-4.0.4.tar.gz
cd suricata-4.0.4
./configure --prefix=/usr --sysconfdir=/etc --localstatedir=/var --enable-nfqueue --enable-lua
make & make install
make install-full

suricata -c /etc/suricata/suricata.yaml -i eth1

更新规则

pip install suricata-update

规则源
https://www.openinfosecfoundation.org/rules/index.yaml

et/open: https://rules.emergingthreats.net/open/suricata-%(__version__)s/emerging.rules.tar.gz
et/pro: https://rules.emergingthreatspro.com/%(secret-code)s/suricata-%(__version__)s/etpro.rules.tar.gz
oisf/trafficid: https://raw.githubusercontent.com/jasonish/suricata-trafficid/master/rules/traffic-id.rules
ptresearch/attackdetection: https://raw.githubusercontent.com/ptresearch/AttackDetection/master/pt.rules.tar.gz
scwx/malware: https://ws.secureworks.com/ti/ruleset/%(secret-code)s/Suricata_suricata-malware_latest.tgz
scwx/security: https://ws.secureworks.com/ti/ruleset/59af35658a44c415/Suricata_suricata-security_latest.tgz
sslbl/ssl-fp-blacklist: https://sslbl.abuse.ch/blacklist/sslblacklist.rules

日志采集

Syslog

syslog日志格式

WAF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
input {
syslog {
timezone => "Asia/Shanghai"
id => "my_plugin_id"
port => 514
}
}
filter {
# drop waf_log_wafstat
if [severity] == 6 {
drop { }
}
# waf log
if [severity] == 3 {
grok {
match => { "message" => "tag:%{DATA:tag}\s*site_id:%{INT:site_id}\s*protect_id:%{INT:protect_id}\s*dst_ip:%{IPORHOST:dst_ip}\s*dst_port:%{INT:dst_port}\s*src_ip:%{IPORHOST:src_ip}\s*src_port:%{INT:src_port}\s*method:%{DATA:method}\s*domain:%{DATA:domain}\s*uri:%{DATA:uri}\s*alertlevel:%{DATA:alert_level}\s*event_type:%{DATA:event_type}\s*stat_time:%{TIMESTAMP_ISO8601:stat_time}\s*policy_id:%{INT:policy_id}\s*rule_id:%{INT:rule_id}\s*action:%{DATA:action}\s*block:%{DATA:block}\s*block_info:%{DATA:block_info}\s*http:%{DATA:http}\s*alertinfo:%{DATA:alertinfo}\s*proxy_info:%{DATA:proxy_info}\s*characters:%{DATA:characters}\s*count_num:%{INT:count_num}\s*protocol_type:%{DATA:protocol_type}\s*wci:%{DATA:wci}\s*wsi:%{DATA:wsi}\s*country:%{DATA:country}"}
}
mutate {
remove_field => ["message"]
}
}
}
output {
elasticsearch {
hosts => ["http://develk01:9200","http://develk02:9200","http://develk03:9200"]
user => "elastic"
password => "changeme"
index => "waf-cs-syslog-217"
}
stdout { codec => rubydebug }
}

sysmon

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
input {
beats {
port => 5044
}
}
filter{
mutate {
split => ["message","\r"]
remove_field => ["message","beat","@version"]
lowercase => ["host"] # index must be lower case
}
}
"source_name" => "Microsoft-Windows-Security-Auditing"
"Microsoft-Windows-Sysmon"
output {
elasticsearch {
hosts => ["http://develk01:9200"]
index => "logstash-%{[host]}-%{+YYYY.MM.dd}"
}
}