指南
https://elkguide.elasticsearch.cn/logstash/get-start/install.html
架构
Elasticsearch 实时全文搜索和分析引擎
Logstash 日志收集,分析,过滤
Kibana 数据图形化展示

Server(producer) Beats -> Zookeeper Kafka topic -> (按照业务功能拆分ELK cluster) Logstash (consumer) -> ES -> Kibana (日志敏感信息泄露)
服务器
/etc/hosts
1 2 3 4 5 6
| 30.3.229.120 develk01 30.3.229.121 develk02 30.3.229.122 develk03 30.3.229.123 devkafka01 30.3.229.124 devkafka02 30.3.229.125 devkafka03
|
添加root用户
useradd -u 0 -o -g root -G root -d /root/ user1
echo “user1”:”passw0rD” | chpasswd
安装
升级java至1.8
卸载低版本java
rpm -qa | grep jdk
yum -y remove jdk-1.7.0_79-fcs.x86_64
yum -y list java*
yum -y install java-1.8.0-openjdk.x86_64
rpm -i jdk-8u171-linux-x64.rpm
rpm -qa | grep logstash
rpm -e –nodeps logstash-5.6.10-1.noarch
版本选择
Beats
Elasticsearch
Elasticsearch Hadoop
Kibana
Logstash
X-Pack
Elasticsearch
/etc/elasticsearch/elasticsearch.yml # els的配置文件
/etc/elasticsearch/jvm.options # JVM相关的配置,内存大小等等
/etc/elasticsearch/log4j2.properties # 日志系统定义
/var/lib/elasticsearch # 数据的默认存放位置
集群配置
/etc/elasticsearch/elasticsearch.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| cluster.name: elk-cluster node.name: ${HOSTNAME} #node.master: true #node.data: true network.host: 0.0.0.0 http.port: 9200 path.data: /data/els/data path.logs: /data/els/logs bootstrap.memory_lock: false bootstrap.system_call_filter: false network.host: 0.0.0.0 http.port: 9200 discovery.zen.ping.unicast.hosts: ["devekl01", "develk02","develk03"] discovery.zen.minimum_master_nodes: 2
|
/etc/security/limits.conf
1 2
| * soft nofile 65536 * hard nofile 65536
|
启动报错
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x0000000085330000, 2060255232, 0) failed; error=’Cannot allocate memory’ (errno=12)
/etc/elasticsearch/jvm.options
-Xms512m
-Xmx512m
[2]: max number of threads [1832] for user [elasticsearch] is too low, increase to at least [2048]
ulimit -u 2048
创建数据目录
1 2 3 4 5 6
| rm -rf /data/ mkdir -p /data/els/{logs,data} chown -R elasticsearch.elasticsearch /data/* service elasticsearch start` or bin/elasticsearch -d
|
查看节点
http://30.3.229.120:9200/_cat/nodes?pretty
常用操作
查看索引
curl http://localhost:9200/_cat/indices?v
删除索引
curl -u elastic:changeme -XDELETE http://localhost:9200/my_index
/_cat/health?v
/_cat/nodes?v
安装x-pack
bin/elasticsearch-plugin install x-pack
修改x-pack默认密码
curl -XPUT -u elastic 'localhost:9200/_xpack/security/user/elastic/_password' -d '{"password" : "dfh*&(dUJ"}'
elasticsearch-head 监控
yum install nodejs
yum install npm
npm install -g grunt-cli
git config –global https.proxy http://127.0.0.1:1080
git clone git://github.com/mobz/elasticsearch-head.git
npm config set strict-ssl false
npm config set registry https://registry.npm.taobao.org
npm config set proxy http://127.0.0.1:1080
npm info express
npm install
grunt server
后台运行
nohup grunt server &
http://localhost:9100
Logstash
1 2 3 4 5 6 7 8
| [logstash-5.x] name=Elastic repository for 5.x packages baseurl=https://artifacts.elastic.co/packages/5.x/yum gpgcheck=1 gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch enabled=1 autorefresh=1 type=rpm-md
|
/etc/logstash/conf.d/elk.conf
input –> filter –> output
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| input { syslog { syslog_field => "syslog" # default message port => 514 } beats { port => 5044 } } filter { } output { elasticsearch { hosts => ["192.168.200.109:9200"] index => "test-%{+YYYY.MM}" } }
|
启动logstash
nohup bin/logstash --debug --path.settings /etc/logstash/ -f config/test.conf > ls.log 2>&1 &
多实例
bin/logstash -f config/syslog.conf –path.data=/tmp
https://www.elastic.co/guide/en/logstash/5.6/config-examples.html
写入kafka
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| input { stdin{} } output { kafka { topic_id => "test" codec => plain { format => "%{message}" charset => "UTF-8" } bootstrap_servers => "192.168.6.22:9092" } stdout{ codec => rubydebug } }
|
读取kafka
区别于低版本,5.0后的版本连接kafka实例地址,而非zk。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| input{ kafka{ bootstrap_servers => ["192.168.6.22:9092"] #client_id => "test" group_id => "test" auto_offset_reset => "latest" consumer_threads => 5 decorate_events => true topics => ["test"] } } output{ elasticsearch { hosts => ["127.0.0.1:9200"] index => "test-%{+YYYY.MM}" user => 'elastic' password => 'changeme' } stdout{ codec => rubydebug } }
|
多进程
bin/logstash -f config/rsyslog.conf –path.data=/tmp
安装x-pack监控
logstash-plugin install file:///tmp/x-pack-5.6.10.zip
nano config/logstash.yml
1 2 3 4 5
| xpack.monitoring.elasticsearch.url: ["http://10.2.0.27:9200"] xpack.monitoring.elasticsearch.username: "elastic" xpack.monitoring.elasticsearch.password: "dfh*&(dUJ" xpack.monitoring.enabled: true xpack.monitoring.collection.interval: 10s
|
Kafka
Topic
Kafka将消息种子(Feed)分门别类 每一类的消息称之为话题(Topic).
Producer
发布消息的对象称之为话题生产者(Kafka topic producer)
Consumer
订阅消息并处理发布的消息的种子的对象称之为话题消费者(consumers)
Broker
已发布的消息保存在一组服务器中称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker). 消费者可以订阅一个或多个话题并从Broker拉数据从而消费这些已发布的消息。
由于broker采用了主题topic–>分区的思想,使得某个分区内部的顺序可以保证有序性,但是分区间的数据不保证有序性。这样,消费者可以以分区为单位,自定义读取的位置——offset。
兼容性
如果使用logstash 5.x 则相应的kafka版本选择0.10.0.x
1 2 3 4 5 6 7
| # |========================================================== # |Kafka Client Version |Logstash Version |Plugin Version |Why? # |0.8 |2.0.0 - 2.x.x |<3.0.0 |Legacy, 0.8 is still popular # |0.9 |2.0.0 - 2.3.x | 3.x.x |Works with the old Ruby Event API (`event['product']['price'] = 10`) # |0.9 |2.4.x - 5.x.x | 4.x.x |Works with the new getter/setter APIs (`event.set('[product][price]', 10)`) # |0.10.0.x |2.4.x - 5.x.x | 5.x.x |Not compatible with the <= 0.9 broker # |==========================================================
|
wget http://mirrors.hust.edu.cn/apache/kafka/0.11.0.2/kafka-0.11.0.2-src.tgz
配置 server.properties
1 2 3
| broker.id=0 listeners=PLAINTEXT://kafka01:9092 advertised.listeners=PLAINTEXT://kafka01:9092
|
修改启动脚本jvm内存使用大小
export KAFKA_HEAP_OPTS="-Xmx512M -Xms256M"
启动服务
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > zk.out 2>&1 &
nohup bin/kafka-server-start.sh config/server.properties > kafka.out 2>&1 &
bin/kafka-server-start.sh -daemon config/server.properties
自动创建topoic
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
查看已创建的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看test topic详情
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
删除topic
1 2 3 4
| bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test # 标记删除 bin/zookeeper-shell.sh localhost:2181 # zk 删除 ls /brokers/topics rmr /brokers/topics/test
|
或者修改server.properties文件参数delete.topic.enable=true
测试
生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
生产者消费者机器必须写kafka主机名hosts
集群
zookeeper.properties
1 2 3 4 5 6 7 8
| initLimit=10 syncLimit=5 dataDir=/tmp/zookeeper clientPort=2181 maxClientCnxns=0 server.0=kafka:2888:3888 server.1=kafka01:2889:3889 server.2=kafka02:2890:3890
|
在dataDir目录设置各自的id
mkdir -p /tmp/zookeeper/log && echo [server.id] > /tmp/zookeeper/myid
server.properties
1 2 3 4
| broker.id=0 # 配置不同的id listeners=PLAINTEXT://kafka:9092 advertised.listeners=PLAINTEXT://kafka:9092 zookeeper.connect=kafka:2181,kafka01:2181,kafka02:2181
|
测试
创建topic并向任意broker写入消息,从任意broker读取消息。
创建Topic
bin/kafka-topics.sh --create --zookeeper devkafka01:2181,devkafka02:2181,devkafka03:2181 --replication-factor 3 --partition 3 --topic mytopic
列出topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看Topic
bin/kafka-topics.sh --describe --zookeeper devkafka01:2181 --topic mytopic
创建生产者
bin/kafka-console-producer.sh --broker-list devkafka01:9092,devkafka02:9092,devkafka03:9092 --topic mytopic
创建消费者
bin/kafka-console-consumer.sh --zookeeper devkafka01:2181,devkafka02:2181,devkafka03:2181 --from-beginning --topic mytopic
监控
1 2 3 4 5 6 7 8
| #!/bin/bash java -cp KafkaOffsetMonitor-assembly-0.3.0-SNAPSHOT.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --offsetStorage kafka \ --zk devkafka01,devkafka02,devkafka03 \ --port 8080 \ --refresh 10.seconds \ --retain 1.days
|
nohup ./kom.sh > /dev/null 2>&1 &
压力测试
Kibana
/etc/kibana/kibana.yml
1 2 3 4 5
| server.port: 5601 # kibana 监听端口 server.host: 0.0.0.0 # 监听ip elasticsearch.url: "http://127.0.0.1:9200" # es主节点 elasticsearch.username: "elastic" elasticsearch.password: "changeme"
|
离线安装x-pack
bin/kibana-plugin install file:///tmp/x-pack-5.6.10.zip
service kibana start
or
nohup bin/kibana &
图表
告警类型饼图
攻击源地址柱状图
访问的url信息
Grafana
Beats
Winlogbeat 5.6.10
配置
winlogbeat.yml
1 2 3 4 5 6 7 8 9 10
| #----------------------------- Logstash output -------------------------------- output.logstash: # The Logstash hosts hosts: ["192.168.200.109:5044"] #--------------------------kafka----------------------------------- output.kafka: # initial brokers for reading cluster metadata hosts: ["192.168.6.22:9092"] topic: "test"
|
Logstash
1 2 3 4 5
| input { beats { port => 5044 } }
|
检查配置
.\winlogbeat.exe -c .\winlogbeat.yml -configtest -e
安装
PS C:\winlogbeat-5.6.10-windows-x86_64> .\install-service-winlogbeat.ps1
启动服务
net start winlogbeat
Flume
X-Pack
默认密码
username: elastic
password: changeme
破解
LicenseVerifier.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| package org.elasticsearch.license; import java.nio.*; import java.util.*; import java.security.*; import org.elasticsearch.common.xcontent.*; import org.apache.lucene.util.*; import org.elasticsearch.common.io.*; import java.io.*; public class LicenseVerifier { public static boolean verifyLicense(final License license, final byte[] encryptedPublicKeyData) { return true; } public static boolean verifyLicense(final License license) { return true; } }
|
javac -cp "/usr/share/elasticsearch/lib/elasticsearch-5.6.10.jar:/usr/share/elasticsearch/lib/lucene-core-6.6.1.jar:/usr/share/elasticsearch/plugins/x-pack/x-pack-5.6.10.jar" LicenseVerifier.java
注册新的license
1
| {"license":{"uid":"d3cbbbee-9155-4e1a-a5ed-a7e8940d6564","type":"platinum","issue_date_in_millis":1499299200000,"expiry_date_in_millis":2524579200999,"max_nodes":1000,"issued_to":"guo dalu (eastmoney)","issuer":"Web Form","signature":"AAAAAwAAAA0C9L3AjL50eKgiW55YAAABmC9ZN0hjZDBGYnVyRXpCOW5Bb3FjZDAxOWpSbTVoMVZwUzRxVk1PSmkxaktJRVl5MUYvUWh3bHZVUTllbXNPbzBUemtnbWpBbmlWRmRZb25KNFlBR2x0TXc2K2p1Y1VtMG1UQU9TRGZVSGRwaEJGUjE3bXd3LzRqZ05iLzRteWFNekdxRGpIYlFwYkJiNUs0U1hTVlJKNVlXekMrSlVUdFIvV0FNeWdOYnlESDc3MWhlY3hSQmdKSjJ2ZTcvYlBFOHhPQlV3ZHdDQ0tHcG5uOElCaDJ4K1hob29xSG85N0kvTWV3THhlQk9NL01VMFRjNDZpZEVXeUtUMXIyMlIveFpJUkk2WUdveEZaME9XWitGUi9WNTZVQW1FMG1DenhZU0ZmeXlZakVEMjZFT2NvOWxpZGlqVmlHNC8rWVVUYzMwRGVySHpIdURzKzFiRDl4TmM1TUp2VTBOUlJZUlAyV0ZVL2kvVk10L0NsbXNFYVZwT3NSU082dFNNa2prQ0ZsclZ4NTltbU1CVE5lR09Bck93V2J1Y3c9PQAAAQB2gL4WXN64P0+c5q6TDyhqPllFvkboZMWjzJHid05qCtI86/I0aSsFgYF3AkVA1qoz7UHsjC/xBsoyhuXfmHn6LbsZYXweZ4LsllG8RJ8HH/bBYVTBt+Mag+wXE/QZUS7HnSA8iAReQ7tY//wyuEVrxFDeAI9cgwWN90RoZ3sAgkzGq0jVr2JoUYeYwNJ4GZ2GMDS7GsHBxNWBJVgfDkZXvLya/jOJhaKi2GvW8mIzFp19/FO+t2+ReUkbF3T35nVIZnqFDVhXtOz981By4ArffE8ythlI4X67Nabtzoy87V5gXanBvsSdHiHpYJMrYwn7DU+93Ie6t56Lesjkj//b","start_date_in_millis":1499299200000}}
|
curl -XPUT -u elastic:changeme 'http://30.3.229.120:9200/_xpack/license?acknowledge=true' -d @l.json
curl -XGET -u elastic:changeme 'http://30.3.229.120:9200/_license'
Watcher
监控含有alert的日志并发送邮件告警
配置elasticsearch.yml
1 2 3 4 5 6 7 8 9 10 11 12
| xpack.notification.email.account: exchange_account: profile: outlook email_defaults: from: user@domain.com smtp: auth: true starttls.enable: false host: mail.domain.com port: 587 user: user password: pass
|
“event_type”: “alert”,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| { "trigger": { // 每间隔5m触发 "schedule": { "interval": "5m" } }, "input": { // 查询结果 "search": { "request": { "search_type": "query_then_fetch", "indices": ["logstash-map-2018.07"], "types": [], "body": { "size": 0, "query": { // bool 同时满足两个条件 "bool" : { "must" : [ { "match" : { "event_type": "alert" }}, { "range" : { "@timestamp" : { "gte" : "now-1h" }}} ] } } } } } }, // 判断acction条件 "condition": { "compare": { "ctx.payload.hits.total": { "gte": 10 } } }, // 执行告警方式 "actions": { "my-logging-action": { "logging": { "level": "info", "text": "Fine {{ctx.payload.hits.total}} alerts in last 5m." } } } }
|
聚合5m钟内登陆失败用户
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| GET logstash-dc01-security-2018.08/_search { "size": 1, "query": { "bool" : { "must" : [{ "match" : { "event_id" : 4625 }}], "filter":[{ "range" : { "@timestamp" : { "gte" : "now-1d" }}}] } }, "aggs": { "group_by_TargetUserName": { "terms": { "field": "event_data.TargetUserName.keyword" } } } } Result: "aggregations": { "group_by_TargetUserName": { "doc_count_error_upper_bound": 0, "sum_other_doc_count": 0, "buckets": [ { "doc_count": 6, "key": "test" }, { "doc_count": 2, "key": "test1" }, { "doc_count": 1, "key": "admin" } ] } }
|
数组
ctx.payload.aggregations.group_by_TargetUserName.buckets
1 2 3 4 5 6 7 8 9 10 11
| "condition": { "array_compare": { "ctx.payload.aggregations.group_by_TargetUserName.buckets" : { "path": "doc_count" , "gte": { "value": 25, "quantifier": "some" } } } }
|
动作
发送邮件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| "send_email" : { "throttle_period": "15m", "email" : { "to" : "<username>@<domainname>", "cc": ["a@<domainname>","b@<domainname>"] "subject" : "Watcher Notification", "body" : "Top10 users:\n{{#ctx.payload.aggregations.topn.buckets}}\n{{key}} {{doc_count}}\n{{/ctx.payload.aggregations.topn.buckets}}", "attachments" : { "attached_data" : { "data" : { "format" : "json" } } }, "priority" : "high" } }
|
webhook
Suricata
https://suricata-ids.org/
安装
1 2 3 4 5 6 7 8 9
| sudo yum -y install gcc libpcap-devel pcre-devel libyaml-devel file-devel \ zlib-devel jansson-devel nss-devel libcap-ng-devel libnet-devel tar make \ libnetfilter_queue-devel lua-devel wget https://www.openinfosecfoundation.org/download/suricata-4.0.4.tar.gz tar -zxvf suricata-4.0.4.tar.gz cd suricata-4.0.4 ./configure --prefix=/usr --sysconfdir=/etc --localstatedir=/var --enable-nfqueue --enable-lua make & make install make install-full
|
suricata -c /etc/suricata/suricata.yaml -i eth1
更新规则
pip install suricata-update
规则源
https://www.openinfosecfoundation.org/rules/index.yaml
et/open: https://rules.emergingthreats.net/open/suricata-%(__version__)s/emerging.rules.tar.gz
et/pro: https://rules.emergingthreatspro.com/%(secret-code)s/suricata-%(__version__)s/etpro.rules.tar.gz
oisf/trafficid: https://raw.githubusercontent.com/jasonish/suricata-trafficid/master/rules/traffic-id.rules
ptresearch/attackdetection: https://raw.githubusercontent.com/ptresearch/AttackDetection/master/pt.rules.tar.gz
scwx/malware: https://ws.secureworks.com/ti/ruleset/%(secret-code)s/Suricata_suricata-malware_latest.tgz
scwx/security: https://ws.secureworks.com/ti/ruleset/59af35658a44c415/Suricata_suricata-security_latest.tgz
sslbl/ssl-fp-blacklist: https://sslbl.abuse.ch/blacklist/sslblacklist.rules
日志采集
Syslog
syslog日志格式
WAF
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| input { syslog { timezone => "Asia/Shanghai" id => "my_plugin_id" port => 514 } } filter { # drop waf_log_wafstat if [severity] == 6 { drop { } } # waf log if [severity] == 3 { grok { match => { "message" => "tag:%{DATA:tag}\s*site_id:%{INT:site_id}\s*protect_id:%{INT:protect_id}\s*dst_ip:%{IPORHOST:dst_ip}\s*dst_port:%{INT:dst_port}\s*src_ip:%{IPORHOST:src_ip}\s*src_port:%{INT:src_port}\s*method:%{DATA:method}\s*domain:%{DATA:domain}\s*uri:%{DATA:uri}\s*alertlevel:%{DATA:alert_level}\s*event_type:%{DATA:event_type}\s*stat_time:%{TIMESTAMP_ISO8601:stat_time}\s*policy_id:%{INT:policy_id}\s*rule_id:%{INT:rule_id}\s*action:%{DATA:action}\s*block:%{DATA:block}\s*block_info:%{DATA:block_info}\s*http:%{DATA:http}\s*alertinfo:%{DATA:alertinfo}\s*proxy_info:%{DATA:proxy_info}\s*characters:%{DATA:characters}\s*count_num:%{INT:count_num}\s*protocol_type:%{DATA:protocol_type}\s*wci:%{DATA:wci}\s*wsi:%{DATA:wsi}\s*country:%{DATA:country}"} } mutate { remove_field => ["message"] } } } output { elasticsearch { hosts => ["http://develk01:9200","http://develk02:9200","http://develk03:9200"] user => "elastic" password => "changeme" index => "waf-cs-syslog-217" } stdout { codec => rubydebug } }
|
sysmon
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| input { beats { port => 5044 } } filter{ mutate { split => ["message","\r"] remove_field => ["message","beat","@version"] lowercase => ["host"] # index must be lower case } } "source_name" => "Microsoft-Windows-Security-Auditing" "Microsoft-Windows-Sysmon" output { elasticsearch { hosts => ["http://develk01:9200"] index => "logstash-%{[host]}-%{+YYYY.MM.dd}" } }
|