ELK

ELK日志收集分析

什么是ELK
通俗来讲,ELK是由Elasticsearch、Logstash、Kibana 、filebeat三个开源软件的组成的一个组合体,这三个软件当中,每个软件用于完成不同的功能,ELK 又称为ELK stack,官方域名为stactic.co,ELK stack的主要优点有如下几个:
处理方式灵活: elasticsearch是实时全文索引,具有强大的搜索功能
配置相对简单:elasticsearch全部使用JSON 接口,logstash使用模块配置,kibana的配置文件部分更简单。
检索性能高效:基于优秀的设计,虽然每次查询都是实时,但是也可以达到百亿级数据的查询秒级响应。
集群线性扩展:elasticsearch和logstash都可以灵活线性扩展
前端操作绚丽:kibana的前端设计比较绚丽,而且操作简单

什么是elasticsearch
是一个高度可扩展的开源全文搜索和分析引擎,它可实现数据的实时全文搜索搜索、支持分布式可实现高可用、提供API接口,可以处理大规模日志数据,比如Nginx、Tomcat、系统日志等功能。

什么是Logstash
可以通过插件实现日志收集和转发,支持日志过滤,支持普通log、自定义json格式的日志解析。

什么是kibana
主要是通过接口调用elasticsearch的数据,并进行前端数据可视化的展现。

为什么使用 ELK
ELK组件在海量日志系统的运维中,可用于解决以下主要问题:

  • 分布式日志数据统一收集,实现集中式查询和管理
  • 故障排查
  • 安全信息和事件管理
  • 报表功能

ELK组件在大数据运维系统中,主要可解决的问题如下:

  • 日志查询,问题排查,故障恢复,故障自愈
  • 应用日志分析,错误报警
  • 性能分析,用户行为分析

架构图:

kibana –> ES –> Logstash –> Redis –> Logstash –> webserver + filebeat

elasticsearch

安装elasticsearch

elasticsearch是java程序,需要预先准备jdk环境。
下载rpm包
编辑各elasticsearch服务器的服务配置文件

1
2
3
4
5
6
7
8
9
[root@node01 ~]# yum -y install elasticsearch-5.6.13.rpm
[root@node01 ~]# grep "^[a-Z]" /etc/elasticsearch/elasticsearch.yml
cluster.name: myelk #ELK的集群名称,名称相同即属于是同一个集群
path.data: /data/esdata/ #数据保存目录
path.logs: /data/esdata/eslogs/ #日志保存目
network.host: 192.168.34.101 #监听IP
bootstrap.memory_lock: true #服务启动的时候锁定足够的内存,防止数据写入swap
http.port: 9200 #监听端口
discovery.zen.ping.unicast.hosts: ["node01", "master"]

修改内存限制,并同步配置文件

1
2
3
4
5
[root@linux-host1 ~]# vim /usr/lib/systemd/system/elasticsearch.service #修改内存限制
LimitMEMLOCK=infinity #最大化使用内存
[root@linux-host1 ~]# vim /etc/elasticsearch/jvm.options
22 \-Xms2g
23 \-Xmx2g #最小和最大内存限制,为什么最小和最大设置一样大?

目录权限更改
各服务器创建数据和日志目录并修改目录权限为elasticsearch

启动elasticsearch,如果启动过程中出现以下错误:

1
localhost.localdomain elasticsearch[21616]: Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x0000000085330000, 2060255232, 0) failed; error='Cannot allocate memory' (errno=12)

说明内存不够,可以通过调小启动内存解决:

1
2
3
4
5
~]# vi /etc/elasticsearch/jvm.options
#-Xms2g
#-Xmx2g
-Xms256m
-Xmx256m

启动

1
~]# systemctl start elasticsearch

elasticsearch会监听9200(用户访问端口),9300(集群内通信端口,比如master选举)端口。

elasticsearch的插件

head插件


插件是为了完成不同的功能,官方提供了一些插件但大部分是收费的,另外也有一些开发爱好者提供的插件,可以实现对elasticsearch集群的状态监控与管理配置等功能。

在elasticsearch 5.x版本以后不再支持直接安装head插件,而是需要通过启动一个服务方式,git地址:https://github.com/mobz/elasticsearch-head

使用docker启动head插件
安装docker,并下载镜像

1
[root@localhost tools]# docker image pull mobz/elasticsearch-head:5

修改elasticsearch服务配置文件:
开启跨域访问支持,然后重启elasticsearch服务:

1
2
3
[root@localhost tools]# vi /etc/elasticsearch/elasticsearch.yml
http.cors.enabled: true
http.cors.allow-origin: "*"

docker版本启动head插件

1
[root@localhost tools]# docker run --name head -d -p 9100:9100 mobz/elasticsearch-head:5

之后在客户端浏览器访问宿主机的9100端口就可以使用mobz/ealasticsearch-head插件管理集群。

Master与Slave的区别:
Master的职责:

  • 统计各node节点状态信息、集群状态信息统计、索引的创建和删除、索引分配的管理、关闭node节点等

Slave的职责:

  • 从master同步数据、等待机会成为Master

kopf插件

Git地址为https://github.com/lmenezes/elasticsearch-kopf,但是目前还不支持5.x版本的elasticsearch,但是可以安装在elasticsearc 1.x或2.x的版本安装。

监控elasticsearch集群状态

通过shell命令获取集群状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[root@localhost tools]#  curl -sXGET  http://192.168.34.101:9200/_cluster/health?pretty=true
{
"cluster_name" : "myelk",
"status" : "green",
"timed_out" : false,
"number_of_nodes" : 2,
"number_of_data_nodes" : 2,
"active_primary_shards" : 0,
"active_shards" : 0,
"relocating_shards" : 0,
"initializing_shards" : 0,
"unassigned_shards" : 0,
"delayed_unassigned_shards" : 0,
"number_of_pending_tasks" : 0,
"number_of_in_flight_fetch" : 0,
"task_max_waiting_in_queue_millis" : 0,
"active_shards_percent_as_number" : 100.0
}

获取到的是一个json格式的返回值,那就可以通过python对其中的信息进行分析,例如对status进行分析,如果等于green(绿色)就是运行在正常,等于yellow(黄色)表示副本分片丢失,red(红色)表示主分片丢失

python脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python
#coding:utf-8
#Author Msq

import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
import subprocess
body = ""
false="false"
obj = subprocess.Popen(("curl -sXGET http://192.168.15.211:9200/_cluster/health?pretty=true"),shell=True, stdout=subprocess.PIPE)
data = obj.stdout.read()
data1 = eval(data)
status = data1.get("status")
if status == "green":
print("50")
else:
print("100")

logstash

监听9600-9700
环境准备,关闭selinux和firewalld,并且安装java环境

1
2
3
4
5
6
7
8
9
[root@linux-host3 ~]# systemctl  stop firewalld
[root@linux-host3 ~]# systemctl disable firewalld
[root@linux-host3 ~]# sed -i '/SELINUX/s/enforcing/disabled/' /etc/selinux/config
[root@linux-host3 ~]# yum install jdk-8u121-linux-x64.rpm
[root@linux-host3 ~]# java -version
java version "1.8.0_121"
Java(TM) SE Runtime Environment (build 1.8.0_121-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)
[root@linux-host3 ~]# reboot

安装logstash

1
2
[root@localhost tools]# yum -y install logstash-5.6.13.rpm
[root@localhost tools]# chown logstash.logstash /usr/share/logstash/data/queue –R #权限更改为logstash用户和组,否则启动的时候日志报错

测试logstash
测试标准输入和输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[root@localhost tools]# /usr/share/logstash/bin/logstash   -e 'input {  stdin{} } output { stdout{  codec => rubydebug }}'
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
The stdin plugin is now waiting for input:
hello world!
{
"@version" => "1",
"host" => "localhost.localdomain",
"@timestamp" => 2019-03-06T14:57:58.006Z,
"message" => "hello world!"
}
中国强
{
"@version" => "1",
"host" => "localhost.localdomain",
"@timestamp" => 2019-03-06T14:58:11.958Z,
"message" => "中国强"
}

测试输出到文件

1
2
3
4
5
6
7
[root@localhost tools]# /usr/share/logstash/bin/logstash   -e 'put {  stdin{} } output { file { path => "/tmp/log-%{+YYYY.MM.dd}messages.gz"}}'
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
The stdin plugin is now waiting for input:
这是一个测试
^C[root@localhost tools]# cat /tmp/log-2019.03.06messages.gz
{"@version":"1","host":"localhost.localdomain","@timestamp":"2019-03-06T15:00:28.838Z","message":"这是一个测试"}

测试输出到elasticsearch

1
2
3
4
[root@localhost tools]# /usr/share/logstash/bin/logstash   -e 'input {  stdin{} } output { elasticsearch {hosts => ["192.168.34.100:9200"] index => "mytest-%{+YYYY.MM.dd}" }}'
WARNING: Could not find logstash.yml which is typically located in $LS_HOME/config or /etc/logstash. You can specify the path using --path.settings. Continuing using the defaults
Could not find log4j2 configuration at path /usr/share/logstash/config/log4j2.properties. Using default config which logs errors to the console
zThe stdin plugin is now waiting for input: 这是一个测试

elasticsearch服务器验证收到数据

1
2
3
[root@localhost tools]# ll /data/esdata/nodes/0/indices/
total 0
drwxr-xr-x. 8 elasticsearch elasticsearch 65 Mar 6 23:03 kr-hxCstTfCk_v2wI9QsAw

kibana

监听5601端口
安装配置kibana

1
2
3
4
5
6
[root@localhost tools]# yum -y install kibana-5.3.0-x86_64.rpm
[root@localhost tools]# grep "^[a-Z]" /etc/kibana/kibana.yml
server.port: 5601 #监听端口
server.host: "0.0.0.0" #监听地址
server.name: "mykibana" #名称
elasticsearch.url: "http://192.168.34.100:9200" #elasticsearch服务器地址

启动kibana服务并验证

1
2
3
[root@localhost tools]# systemctl  start kibana
[root@localhost tools]# systemctl enable kibana
[root@localhost tools]# ss -tnl | grep 5601

在客户端浏览器访问该页面就可以使用kibana。
如果默认没有显示柱状的图,可能是最近没有写入新的数据,可以查看较长日期当中的数据或者通过logstash新写入数据即可。

通过logstash收集日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
[root@localhost conf.d]# cat system-log.conf 
input {
file {
type => "messagelog"
path => "/var/log/messages"
start_position => "beginning"
stat_interval => "3"
}
file {
type => "securelog"
path => "/var/log/secure"
start_position => "beginning"
stat_interval => "3"
}
}

output {
if [type] == "messagelog" {
elasticsearch {
hosts => "192.168.34.100:9200"
index => "messagelog-100-%{+YYYY.MM.dd}"
}
}
if [type] == "securelog" {
elasticsearch {
hosts => "192.168.34.101:9200"
index => "securelog-101-%{+YYYY.MM.dd}"
}
}
}

通过logtsash收集tomcat和java日志

配置安装java环境,并自定义一个web界面进行测试

1
2
3
4
5
6
7
8
9
10
11
[root@linux-host6 ~]# yum install jdk-8u121-linux-x64.rpm
[root@linux-host6 ~]# cd /usr/local/src/
[root@linux-host6 src]# tar xvf apache-tomcat-8.0.38.tar.gz
[root@linux-host6 src]# ln -sv /usr/local/src/apache-tomcat-8.0.38 /usr/local/tomcat
‘/usr/local/tomcat’ -> ‘/usr/local/src/apache-tomcat-8.0.38’
[root@linux-host6 tomcat]# cd /usr/local/tomcat/webapps/
[root@linux-host6 webapps]#mkdir /usr/local/tomcat/webapps/webdir
[root@linux-host6 webapps]# echo "Tomcat Page" > /usr/local/tomcat/webapps/webdir/index.html
[root@linux-host6 webapps]# ../bin/catalina.sh start
[root@linux-host6 webapps]# ss -tnl | grep 8080
LISTEN 0 100 :::8080 :::*

确认tomcat可以正常访问。

将tomcat日志转json

1
2
3
4
5
6
7
8
[root@node02 tomcat]# vi conf/server.xml
...
<Host name="localhost" appBase="webapps"
unpackWARs="true" autoDeploy="true">
<Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
prefix="tomcat_access_log" suffix=".log"
pattern="{&quot;clientip&quot;:&quot;%h&quot;,&quot;ClientUser&quot;:&quot;%l&quot;,&quot;authenticated&quot;:&quot;%u&quot;,&quot;AccessTime&quot;:&quot;%t&quot;,&quot;method&quot;:&quot;%r&quot;,&quot;status&quot;:&quot;%s&quot;,&quot;SendBytes&quot;:&quot;%b&quot;,&quot;Query?string&quot;:&quot;%q&quot;,&quot;partner&quot;:&quot;%{Referer}i&quot;,&quot;AgentVersion&quot;:&quot;%{User-Agent}i&quot;}"/>
</Host>

重启tomcat。

在tomcat主机安装logstash并编辑配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
[root@node02 logs]# cat /etc/logstash/conf.d/tomcat.conf 
input {
file {
path => "/usr/local/tomcat/logs/catalina.out"
start_position => "beginning"
stat_interval => 3
type => "tomcatlog"
}
file {
path => "/usr/local/tomcat/logs/tomcat_access_log.*.log"
start_position => "beginning"
stat_interval => 3
type => "accesslog"
codec => "json"
}
}

output {
if [type] == "tomcatlog" {
elasticsearch {
hosts => "192.168.34.100:9200"
index => "tomcat-102-%{+YYYY.MM.dd}"
}
}
if [type] == "accesslog" {
elasticsearch {
hosts => "192.168.34.101:9200"
index => "logstash-tomcat-accesslog-102-%{+YYYY.MM.dd}"
}
}
}

启动logstash

1
[root@node02 ~]# systemctl start logstash

然后在kibana加入响应索引即可收集tomcat的访问日志。

关于sincedb:

1
2
3
[root@node02 ~]# cat /var/lib/logstash/plugins/inputs/file/.sincedb_19224e5b5b842ee829563544811600e0 
0 0 0 0
34905429 0 64768 11370

记录了手机文件的inode信息。

通过logstash收集java日志

使用codec的multiline插件实现多行匹配,这是一个可以将多行进行合并的插件,而且可以使用what指定将匹配到的行与前面的行合并还是和后面的行合并,https://www.elastic.co/guide/en/logstash/current/plugins-codecs-multiline.html

在服务器部署logstash并生成测试文件进行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
input {
stdin {
codec => multiline {
pattern => "^\[" #当遇到[开头的行时候将多行进行合并
negate => true #true为匹配成功进行操作,false为不成功进行操作
what => "previous" #与上面的行合并,如果是下面的行合并就是next
}}
}
filter { #日志过滤,如果所有的日志都过滤就写这里,如果只针对某一个过滤就写在input里面的日志输入里面
}
output {
stdout {
codec => rubydebug
}}

配置读取日志文件并将输出改为elasticsearch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
[root@node02 conf.d]# cat java.conf 
input {
file {
path => "/elk/logs/ELK-Cluster.log"
type => "javalog"
start_position => "beginning"
codec => multiline {
pattern => "^\["
negate => true
what => "previous"
}
}
}

output {
if [type] == "javalog" {
elasticsearch {
hosts => ["192.168.34.100:9200"]
index => "javalog-javalog-102-%{+YYYY.MM.dd}"
}
}
}

然后重启一下elasticsearch服务,以生成新的日志,验证logstash能否自动收集新生成的日志。
然后配置kibana,创建新的索引,就可以浏览收集的日志。

通过logstash收集nginx的访问日志

部署nginx服务

1
2
3
4
5
6
7
8
9
[root@linux-host6 ~]# yum install gcc gcc-c++ automake pcre pcre-devel zlip zlib-devel openssl openssl-devel
[root@linux-host6 ~]# cd /usr/local/src/
[root@linux-host6 src]# wget http://nginx.org/download/nginx-1.10.3.tar.gz
[root@linux-host6 src]# tar xvf nginx-1.10.3.tar.gz
[root@linux-host6 src]# cd nginx-1.10.3
[root@linux-host6 nginx-1.10.3]# ./configure --prefix=/usr/local/nginx-1.10.3
[root@linux-host6 nginx-1.10.3]# make && make install
[root@linux-host6 nginx-1.10.3]# ln -sv /usr/local/nginx-1.10.3 /usr/local/nginx
‘/usr/local/nginx’ -> ‘/usr/local/nginx-1.10.3’

编辑配置文件并准备web页面

将nginx的访问日志转为json格式

将nginx的访问日志转为json格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[root@node02 ~]# vi /etc/nginx/nginx.conf
http{
...
log_format access_json '{"@timestamp":"$time_iso8601",'
'"host":"$server_addr",'
'"clientip":"$remote_addr",'
'"size":$body_bytes_sent,'
'"responsetime":$request_time,'
'"upstreamtime":"$upstream_response_time",'
'"upstreamhost":"$upstream_addr",'
'"http_host":"$host",'
'"url":"$uri",'
'"domain":"$host",'
'"xff":"$http_x_forwarded_for",'
'"referer":"$http_referer",'
'"status":"$status"}';
access_log /var/log/nginx/access.log access_json;
...
}

配置logstash收集nginx日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[root@node02 conf.d]# pwd
/etc/logstash/conf.d
[root@node02 conf.d]# cat nginx.conf
input {
file {
path => "/var/log/nginx/access.log"
start_position => "end"
type => "end"
codec => "json"
}
}

output {
elasticsearch {
hosts => "192.168.34.100:9200"
index => "logstash-nginx-accesslog-102-%{+YYYY.MM.dd}"
}
}

通过logstash收集TCP/UDP日志

通过logstash的tcp/udp插件收集日志,通常用于在向elasticsearch日志补录丢失的部分日志,可以将丢失的日志写到一个文件,然后通过TCP日志收集方式直接发送给logstash然后再写入到elasticsearch服务器。
https://www.elastic.co/guide/en/logstash/5.6/input-plugins.html

编辑logstash配置文件,进行日志收集测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[root@node02 conf.d]# cat tcp.conf 
input {
tcp {
port => 9889
type => "tcplog"
mode => "server"
}
}


output {
stdout {
codec => rubydebug
}
}

此时端口9889处于监听状态,在其他服务器安装nc命令用于测试

1
2
3
[root@node01 ~]# yum -y install nc
[root@node01 ~]# echo "nc test" | nc 192.168.34.102 9889
[root@node01 ~]# nc 192.168.15.16 9889 < /etc/passwd

通过nc发送一个文件:

1
nc 192.168.15.16 9889 < /etc/passwd

通过伪设备的方式发送消息
在类Unix操作系统中,块设备有硬盘、内存的硬件,但是还有设备节点并不一定要对应物理设备,我们把没有这种对应关系的设备是伪设备,比如/dev/null,/dev/zero,/dev/random以及/dev/tcp和/dev/upd等,Linux操作系统使用这些伪设备提供了多种不通的功能,tcp通信只是dev下面众多伪设备当中的一种设备。

1
[root@linux-host1 ~]# echo "伪设备"  > /dev/tcp/192.168.15.16/9889

将输出改为elasticsearch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[root@node02 conf.d]# cat tcp.conf 
input {
tcp {
port => 9889
type => "tcplog"
mode => "server"
}
}


output {
elasticsearch {
hosts => "192.168.34.100:9200"
index => "logstash-tcplog-%{+YYYY.MM.dd}"
}
}

之后在kibana添加索引就可以查看收集的tcp日志

通过rsyslog收集haproxy日志

在centos 6及之前的版本叫做syslog,centos 7开始叫做rsyslog,根据官方的介绍,rsyslog(2013年版本)可以达到每秒转发百万条日志的级别,官方网址:http://www.rsyslog.com/,确认系统安装的版本命令如下:

1
2
[root@node02 conf.d]# rpm -q rsyslog
rsyslog-8.24.0-16.el7.x86_64

编辑haproxy和rsyslog的配置文件生成haproxy日志:

1
2
3
[root@node02 conf.d]# vi /etc/haproxy/haproxy.cfg 
global
log 127.0.0.1 local2 info
1
2
3
4
5
6
7
8
9
[root@node02 rsyslog.d]# vi haproxy.conf
$ModLoad imudp
$UDPServerRun 514

$ModLoad imtcp
$InputTCPServerRun 514

local2.* /var/log/haproxy.log
local2.* @@192.168.34.102:516

之后重启haproxy和rsyslog,确保正常启动。

编辑logstash配置文件
将收集收集并传输至elasticsearch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[root@node02 conf.d]# pwd
/etc/logstash/conf.d
[root@node02 conf.d]# cat haproxy.conf
input {
syslog {
type => "system-rsyslog"
port => "516"
}
}

output {
elasticsearch {
hosts => ["192.168.34.100:9200"]
index => "logstash-rsyslog-haproxy-%{+YYYY.MM.dd}"
}
}

之后在kibana计收入索引即可将haproxy的日志加入ELK。

logstash收集日志并写入redis

用一台服务器安装部署redis服务,专门用于写入日志缓存使用,用于web服务器产生大量日志的场景。
部署如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
[root@linux-host2 ~]# cd /usr/local/src/
[root@linux-host2 src]# wget http://download.redis.io/releases/redis-3.2.8.tar.gz
[root@linux-host2 src]# tar xvf redis-3.2.8.tar.gz
[root@linux-host2 src]# ln -sv /usr/local/src/redis-3.2.8 /usr/local/redis
‘/usr/local/redis’ -> ‘/usr/local/src/redis-3.2.8’
[root@linux-host2 src]#cd /usr/local/redis/deps
[root@linux-host2 redis]# yum install gcc
[root@linux-host2 deps]# make geohash-int hiredis jemalloc linenoise lua
[root@linux-host2 deps]# cd ..
[root@linux-host2 redis]# make
[root@linux-host2 redis]# vim redis.conf
[root@linux-host2 redis]# grep "^[a-Z]" redis.conf #主要改动的地方
[root@node01 ~]# grep "^[a-Z]" /etc/redis.conf
bind 0.0.0.0
protected-mode yes
port 6379
tcp-backlog 511
timeout 0
tcp-keepalive 300
daemonize yes
supervised no
pidfile /var/run/redis_6379.pid
loglevel notice
logfile /var/log/redis/redis.log
databases 16
save 900 1
save 300 10
save 60 10000
stop-writes-on-bgsave-error yes
rdbcompression no
rdbchecksum no
dbfilename dump.rdb
dir /var/lib/redis
slave-serve-stale-data yes
slave-read-only yes
repl-diskless-sync no
repl-diskless-sync-delay 5
repl-disable-tcp-nodelay no
slave-priority 100
appendonly no
appendfilename "appendonly.aof"
appendfsync everysec
no-appendfsync-on-rewrite no
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
aof-load-truncated yes
lua-time-limit 5000
slowlog-log-slower-than 10000
slowlog-max-len 128
latency-monitor-threshold 0
notify-keyspace-events ""
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
list-max-ziplist-size -2
list-compress-depth 0
set-max-intset-entries 512
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
hll-sparse-max-bytes 3000
activerehashing yes
client-output-buffer-limit normal 0 0 0
client-output-buffer-limit slave 256mb 64mb 60
client-output-buffer-limit pubsub 32mb 8mb 60
hz 10
aof-rewrite-incremental-fsync yes

启动redis并设置访问密码

1
2
[root@node01 ~]# redis-cli -h 127.0.0.1
127.0.0.1:6379> CONFIG SET requirepass 123456 #安全起见,生产环境中redis必须设置密码,或者在配置文件中设置:requirepass 123456 #redis.conf配置文件

将logstash收集的日志写入redis
将tomcat服务器的logstash收集之后的tomcat 访问日志写入到redis服务器,然后通过另外的logstash将redis服务器的数据取出在写入到elasticsearch服务器。
官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-redis.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
[root@node02 conf.d]# cat tomcat_tcp.conf 
input {
file {
path => "/usr/local/tomcat/logs/tomcat_access_log.*.log"
start_position => "beginning"
stat_interval => 3
type => "accesslog"
}
tcp {
port => 516
mode => "server"
type => "tcplog-102"
}
}

output {
if [type] == "accesslog" {
redis {
data_type => "list"
key => "tomcat-accesslog-102"
host => "192.168.34.101"
port => "6379"
db => "0"
password => ""
}
}
if [type] == "tcplog-102" {
redis {
data_type => "list"
key => "tcplog-102"
host => "192.168.34.101"
port => "6379"
db => "1"
password => ""
}
}
}

配置其他logstash服务器从redis读取数据
配置专门logstash服务器从redis读取指定的key的数据,并写入到elasticsearch。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
[root@node01 conf.d]# cat redis-es.conf 
input {
redis {
data_type => "list"
key => "tomcat-accesslog-102"
host => "192.168.34.101"
port => "6379"
db => "0"
password => "123456"
codec => "json"
}

redis {
data_type => "list"
key => "tcplog-102"
host => "192.168.34.101"
port => "6379"
db => "1"
password => "123456"
codec => "json"
}
}

output {
if [type] == "tomcat-accesslog-102" {
elasticsearch {
hosts => "192.168.34.100:9200"
index => "redis-tomcat-102-%{+YYYY.MM.dd}"
}
}
if [type] == "tcplog-102" {
elasticsearch {
hosts => "192.168.34.100:9200"
index => "redis-tcplog-102-%{+YYYY.MM.dd}"
}
}
}

此时,在kibana上添加指定的索引即可看到该数据。

logstash收集日志并写入kafka

kafka可以结合zookeeper更方便的实现扩容。
kafka将数据存放在分布式的存储系统zookeeper内,
安装kafka
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker

Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)

Partition
parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件

Producer
负责发布消息到Kafka broker

Consumer
消费消息。每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。

在三台服务器上分别安装启动kafka

1
2
3
4
5
6
7
[root@linux-host1 src]# tar xvf kafka_2.11-1.0.0.tgz
[root@linux-host1 src]# ln -sv /usr/local/src/kafka_2.11-1.0.0 /usr/local/kafka
[root@linux-host1 src]# vim /usr/local/kafka/config/server.properties
21 broker.id=1 #设置每个代理全局唯一的整数ID
31 listeners=PLAINTEXT://192.168.15.211:9092
103 log.retention.hours=24 #保留指定小时的日志内容
123 zookeeper.connect=192.168.15.211:2181,192.168.15.212:2181,192.168.15.213:2181 #所有的zookeeper地址

启动kafka

1
~]# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties  #以守护进程的方式启动

测试kafka

1
2
3
4
[root@localhost local]# jps
4789 Kafka
1383 QuorumPeerMain
5147 Jps

测试创建topic
创建名为logstashtest,partitions(分区)为3,replication(复制)为3的topic(主题):
在任意kafaka服务器操作:

1
[root@node01 local]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.34.100:2181,192.168.34.101:2181,192.168.34102:2181 --partitions 3 --replication-factor 3 --topic logstashtest

测试获取topic
可以在任意一台kafka服务器上进行测试

1
2
3
4
5
[root@localhost local]# kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.34.100:2181,192.168.34.101:2181,192.168.34.102:2181 --topic logstashtest
Topic:logstashtest PartitionCount:3 ReplicationFactor:3 Configs:
Topic: logstashtest Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: logstashtest Partition: 1 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: logstashtest Partition: 2 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3

状态说明:logstashtest有三个分区分别为0、1、2,分区0的leader是3(broker.id),分区0有三个副本,并且状态都为lsr(ln-sync,表示可以参加选举成为leader)。

删除指定topic

1
2
3
[root@localhost local]# kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.34.100:2181,192.168.34.101:2181,192.168.34102:2181 --topic logstashtest
Topic logstashtest is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

获取所有topic

1
[root@localhost local]# kafka/bin/kafka-topics.sh --list --zookeeper 192.168.34.100:2181,192.168.34.101:2181,192.168.34.102:2181

kafka命令测试消息发送

创建topic

1
2
[root@localhost local]# kafka/bin/kafka-topics.sh --create --zookeeper 192.168.34.100:2181,192.168.34.101:2181,192.168.34.102:2181 --partitions 3 --replication-factor 3 --topic messagetest
Created topic "messagetest".

发送消息

1
2
3
4
5
6
7
8
[root@localhost local]# kafka/bin/kafka-console-producer.sh --broker-list 192.168.34.100:9092,192.168.34.101:9092,192.168.34.102:9092 --topic messagetest
>hello
>kafka
>logstash
>ss
>oo
>start
>

在其他kafka服务器测试数据获取

1
[root@node01 local]# kafka/bin/kafka-console-consumer.sh  --bootstrap-server 192.168.34.100:2181,192.168.34.101:2181,192.168.34.102:2181 --topic messagetest --from-beginning