ELK日志收集分析
什么是ELK
通俗来讲,ELK是由Elasticsearch、Logstash、Kibana 、filebeat三个开源软件的组成的一个组合体,这三个软件当中,每个软件用于完成不同的功能,ELK 又称为ELK stack,官方域名为stactic.co,ELK stack的主要优点有如下几个:
处理方式灵活: elasticsearch是实时全文索引,具有强大的搜索功能
配置相对简单:elasticsearch全部使用JSON 接口,logstash使用模块配置,kibana的配置文件部分更简单。
检索性能高效:基于优秀的设计,虽然每次查询都是实时,但是也可以达到百亿级数据的查询秒级响应。
集群线性扩展:elasticsearch和logstash都可以灵活线性扩展
前端操作绚丽:kibana的前端设计比较绚丽,而且操作简单
什么是elasticsearch
是一个高度可扩展的开源全文搜索和分析引擎,它可实现数据的实时全文搜索搜索、支持分布式可实现高可用、提供API接口,可以处理大规模日志数据,比如Nginx、Tomcat、系统日志等功能。
什么是Logstash
可以通过插件实现日志收集和转发,支持日志过滤,支持普通log、自定义json格式的日志解析。
什么是kibana
主要是通过接口调用elasticsearch的数据,并进行前端数据可视化的展现。
为什么使用 ELK
ELK组件在海量日志系统的运维中,可用于解决以下主要问题:
- 分布式日志数据统一收集,实现集中式查询和管理
- 故障排查
- 安全信息和事件管理
- 报表功能
ELK组件在大数据运维系统中,主要可解决的问题如下:
- 日志查询,问题排查,故障恢复,故障自愈
- 应用日志分析,错误报警
- 性能分析,用户行为分析
架构图:
kibana –> ES –> Logstash –> Redis –> Logstash –> webserver + filebeat
elasticsearch
安装elasticsearch
elasticsearch是java程序,需要预先准备jdk环境。
下载rpm包
编辑各elasticsearch服务器的服务配置文件
1 | [root@node01 ~]# yum -y install elasticsearch-5.6.13.rpm |
修改内存限制,并同步配置文件
1 | [root@linux-host1 ~]# vim /usr/lib/systemd/system/elasticsearch.service #修改内存限制 |
目录权限更改
各服务器创建数据和日志目录并修改目录权限为elasticsearch
启动elasticsearch,如果启动过程中出现以下错误:
1 | localhost.localdomain elasticsearch[21616]: Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x0000000085330000, 2060255232, 0) failed; error='Cannot allocate memory' (errno=12) |
说明内存不够,可以通过调小启动内存解决:
1 | ~]# vi /etc/elasticsearch/jvm.options |
启动
1 | ~]# systemctl start elasticsearch |
elasticsearch会监听9200(用户访问端口),9300(集群内通信端口,比如master选举)端口。
elasticsearch的插件
head插件
‘
插件是为了完成不同的功能,官方提供了一些插件但大部分是收费的,另外也有一些开发爱好者提供的插件,可以实现对elasticsearch集群的状态监控与管理配置等功能。
在elasticsearch 5.x版本以后不再支持直接安装head插件,而是需要通过启动一个服务方式,git地址:https://github.com/mobz/elasticsearch-head
使用docker启动head插件
安装docker,并下载镜像
1 | [root@localhost tools]# docker image pull mobz/elasticsearch-head:5 |
修改elasticsearch服务配置文件:
开启跨域访问支持,然后重启elasticsearch服务:
1 | [root@localhost tools]# vi /etc/elasticsearch/elasticsearch.yml |
docker版本启动head插件
1 | [root@localhost tools]# docker run --name head -d -p 9100:9100 mobz/elasticsearch-head:5 |
之后在客户端浏览器访问宿主机的9100端口就可以使用mobz/ealasticsearch-head插件管理集群。
Master与Slave的区别:
Master的职责:
- 统计各node节点状态信息、集群状态信息统计、索引的创建和删除、索引分配的管理、关闭node节点等
Slave的职责:
- 从master同步数据、等待机会成为Master
kopf插件
Git地址为https://github.com/lmenezes/elasticsearch-kopf,但是目前还不支持5.x版本的elasticsearch,但是可以安装在elasticsearc 1.x或2.x的版本安装。
监控elasticsearch集群状态
通过shell命令获取集群状态
1 | [root@localhost tools]# curl -sXGET http://192.168.34.101:9200/_cluster/health?pretty=true |
获取到的是一个json格式的返回值,那就可以通过python对其中的信息进行分析,例如对status进行分析,如果等于green(绿色)就是运行在正常,等于yellow(黄色)表示副本分片丢失,red(红色)表示主分片丢失
python脚本
1 | #!/usr/bin/env python |
logstash
监听9600-9700
环境准备,关闭selinux和firewalld,并且安装java环境
1 | [root@linux-host3 ~]# systemctl stop firewalld |
安装logstash
1 | [root@localhost tools]# yum -y install logstash-5.6.13.rpm |
测试logstash
测试标准输入和输出
1 | [root@localhost tools]# /usr/share/logstash/bin/logstash -e 'input { stdin{} } output { stdout{ codec => rubydebug }}' |
测试输出到文件
1 | [root@localhost tools]# /usr/share/logstash/bin/logstash -e 'put { stdin{} } output { file { path => "/tmp/log-%{+YYYY.MM.dd}messages.gz"}}' |
测试输出到elasticsearch
1 | [root@localhost tools]# /usr/share/logstash/bin/logstash -e 'input { stdin{} } output { elasticsearch {hosts => ["192.168.34.100:9200"] index => "mytest-%{+YYYY.MM.dd}" }}' |
elasticsearch服务器验证收到数据
1 | [root@localhost tools]# ll /data/esdata/nodes/0/indices/ |
kibana
监听5601端口
安装配置kibana
1 | [root@localhost tools]# yum -y install kibana-5.3.0-x86_64.rpm |
启动kibana服务并验证
1 | [root@localhost tools]# systemctl start kibana |
在客户端浏览器访问该页面就可以使用kibana。
如果默认没有显示柱状的图,可能是最近没有写入新的数据,可以查看较长日期当中的数据或者通过logstash新写入数据即可。
通过logstash收集日志
1 | [root@localhost conf.d]# cat system-log.conf |
通过logtsash收集tomcat和java日志
配置安装java环境,并自定义一个web界面进行测试
1 | [root@linux-host6 ~]# yum install jdk-8u121-linux-x64.rpm |
确认tomcat可以正常访问。
将tomcat日志转json
1 | [root@node02 tomcat]# vi conf/server.xml |
重启tomcat。
在tomcat主机安装logstash并编辑配置文件:
1 | [root@node02 logs]# cat /etc/logstash/conf.d/tomcat.conf |
启动logstash
1 | [root@node02 ~]# systemctl start logstash |
然后在kibana加入响应索引即可收集tomcat的访问日志。
关于sincedb:
1 | [root@node02 ~]# cat /var/lib/logstash/plugins/inputs/file/.sincedb_19224e5b5b842ee829563544811600e0 |
记录了手机文件的inode信息。
通过logstash收集java日志
使用codec的multiline插件实现多行匹配,这是一个可以将多行进行合并的插件,而且可以使用what指定将匹配到的行与前面的行合并还是和后面的行合并,https://www.elastic.co/guide/en/logstash/current/plugins-codecs-multiline.html
在服务器部署logstash并生成测试文件进行测试
1 | input { |
配置读取日志文件并将输出改为elasticsearch
1 | [root@node02 conf.d]# cat java.conf |
然后重启一下elasticsearch服务,以生成新的日志,验证logstash能否自动收集新生成的日志。
然后配置kibana,创建新的索引,就可以浏览收集的日志。
通过logstash收集nginx的访问日志
部署nginx服务
1 | [root@linux-host6 ~]# yum install gcc gcc-c++ automake pcre pcre-devel zlip zlib-devel openssl openssl-devel |
编辑配置文件并准备web页面
将nginx的访问日志转为json格式
将nginx的访问日志转为json格式
1 | [root@node02 ~]# vi /etc/nginx/nginx.conf |
配置logstash收集nginx日志:
1 | [root@node02 conf.d]# pwd |
通过logstash收集TCP/UDP日志
通过logstash的tcp/udp插件收集日志,通常用于在向elasticsearch日志补录丢失的部分日志,可以将丢失的日志写到一个文件,然后通过TCP日志收集方式直接发送给logstash然后再写入到elasticsearch服务器。
https://www.elastic.co/guide/en/logstash/5.6/input-plugins.html
编辑logstash配置文件,进行日志收集测试
1 | [root@node02 conf.d]# cat tcp.conf |
此时端口9889处于监听状态,在其他服务器安装nc命令用于测试
1 | [root@node01 ~]# yum -y install nc |
通过nc发送一个文件:
1 | nc 192.168.15.16 9889 < /etc/passwd |
通过伪设备的方式发送消息
在类Unix操作系统中,块设备有硬盘、内存的硬件,但是还有设备节点并不一定要对应物理设备,我们把没有这种对应关系的设备是伪设备,比如/dev/null,/dev/zero,/dev/random以及/dev/tcp和/dev/upd等,Linux操作系统使用这些伪设备提供了多种不通的功能,tcp通信只是dev下面众多伪设备当中的一种设备。
1 | [root@linux-host1 ~]# echo "伪设备" > /dev/tcp/192.168.15.16/9889 |
将输出改为elasticsearch
1 | [root@node02 conf.d]# cat tcp.conf |
之后在kibana添加索引就可以查看收集的tcp日志
通过rsyslog收集haproxy日志
在centos 6及之前的版本叫做syslog,centos 7开始叫做rsyslog,根据官方的介绍,rsyslog(2013年版本)可以达到每秒转发百万条日志的级别,官方网址:http://www.rsyslog.com/,确认系统安装的版本命令如下:
1 | [root@node02 conf.d]# rpm -q rsyslog |
编辑haproxy和rsyslog的配置文件生成haproxy日志:
1 | [root@node02 conf.d]# vi /etc/haproxy/haproxy.cfg |
1 | [root@node02 rsyslog.d]# vi haproxy.conf |
之后重启haproxy和rsyslog,确保正常启动。
编辑logstash配置文件
将收集收集并传输至elasticsearch
1 | [root@node02 conf.d]# pwd |
之后在kibana计收入索引即可将haproxy的日志加入ELK。
logstash收集日志并写入redis
用一台服务器安装部署redis服务,专门用于写入日志缓存使用,用于web服务器产生大量日志的场景。
部署如下:
1 | [root@linux-host2 ~]# cd /usr/local/src/ |
启动redis并设置访问密码
1 | [root@node01 ~]# redis-cli -h 127.0.0.1 |
将logstash收集的日志写入redis
将tomcat服务器的logstash收集之后的tomcat 访问日志写入到redis服务器,然后通过另外的logstash将redis服务器的数据取出在写入到elasticsearch服务器。
官方文档:https://www.elastic.co/guide/en/logstash/current/plugins-outputs-redis.html
1 | [root@node02 conf.d]# cat tomcat_tcp.conf |
配置其他logstash服务器从redis读取数据
配置专门logstash服务器从redis读取指定的key的数据,并写入到elasticsearch。
1 | [root@node01 conf.d]# cat redis-es.conf |
此时,在kibana上添加指定的索引即可看到该数据。
logstash收集日志并写入kafka
kafka可以结合zookeeper更方便的实现扩容。
kafka将数据存放在分布式的存储系统zookeeper内,
安装kafka
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)
Partition
parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件
Producer
负责发布消息到Kafka broker
Consumer
消费消息。每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。
在三台服务器上分别安装启动kafka
1 | [root@linux-host1 src]# tar xvf kafka_2.11-1.0.0.tgz |
启动kafka
1 | ~]# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties #以守护进程的方式启动 |
测试kafka
1 | [root@localhost local]# jps |
测试创建topic
创建名为logstashtest,partitions(分区)为3,replication(复制)为3的topic(主题):
在任意kafaka服务器操作:
1 | [root@node01 local]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.34.100:2181,192.168.34.101:2181,192.168.34102:2181 --partitions 3 --replication-factor 3 --topic logstashtest |
测试获取topic
可以在任意一台kafka服务器上进行测试
1 | [root@localhost local]# kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.34.100:2181,192.168.34.101:2181,192.168.34.102:2181 --topic logstashtest |
状态说明:logstashtest有三个分区分别为0、1、2,分区0的leader是3(broker.id),分区0有三个副本,并且状态都为lsr(ln-sync,表示可以参加选举成为leader)。
删除指定topic
1 | [root@localhost local]# kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.34.100:2181,192.168.34.101:2181,192.168.34102:2181 --topic logstashtest |
获取所有topic
1 | [root@localhost local]# kafka/bin/kafka-topics.sh --list --zookeeper 192.168.34.100:2181,192.168.34.101:2181,192.168.34.102:2181 |
kafka命令测试消息发送
创建topic
1 | [root@localhost local]# kafka/bin/kafka-topics.sh --create --zookeeper 192.168.34.100:2181,192.168.34.101:2181,192.168.34.102:2181 --partitions 3 --replication-factor 3 --topic messagetest |
发送消息
1 | [root@localhost local]# kafka/bin/kafka-console-producer.sh --broker-list 192.168.34.100:9092,192.168.34.101:9092,192.168.34.102:9092 --topic messagetest |
在其他kafka服务器测试数据获取
1 | [root@node01 local]# kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.34.100:2181,192.168.34.101:2181,192.168.34.102:2181 --topic messagetest --from-beginning |