zookeeper+kafka

ELK日志收集分析

下载安装并验证zookeeper

kafka下载地址:http://kafka.apache.org/downloads.html
zookeeper下载地址:http://zookeeper.apache.org/releases.html

安装zookeeper
zookeeper集群特性:整个集群种只要有超过集群数量一半的zookeeper工作只正常的,那么整个集群对外就是可用的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
~]# yum -y install jdk-8u192-linux-x64.rpm
~]# cd /usr/local/src/
~]# tar xf zookeeper-3.4.13.tar.gz -C /usr/local/src/
~]# cd /usr/local/
~]# ln -sv ./src/zookeeper-3.4.13/ zookeeper
~]# cp zookeeper/conf/zoo{_sample,}.cfg
~]# mkdir /usr/local/zookeeper/data
~]# grep "^[a-Z]" /usr/local/zookeeper/conf/zoo.cfg
tickTime=2000 #服务器与服务器之间和客户端与服务器之间的单次心跳检测时间间隔,单位为毫秒
initLimit=5 #集群中leader服务器与follower服务器初始连接心跳次数,即多少个2000毫秒
syncLimit=5 # leader与follower之间连接完成之后,后期检测发送和应答的心跳次数,如果该follower 在设置的时间内(5*2000)不能与leader 进行通信,那么此 follower 将被视为不可用。
clientPort=2181 #客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求
dataDir=/usr/local/zookeeper/data #自定义的zookeeper保存数据的目录
server.1=192.168.15.211:2888:3888 #服务器编号=服务器IP:LF数据同步端口:LF选举端口
server.2=192.168.15.212:2888:3888
server.3=192.168.15.213:2888:3888
~]# echo "1" > /usr/local/zookeeper/data/myid

本集群共使用三台服务器组建,每台服务器除myid不同其余配置保持一样。
启动zookeeper并验证状态

1
2
3
4
5
6
7
8
9
[root@node02 local]# zookeeper/bin/zkServer.sh start
[root@node02 local]# zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: leader
[root@node01 local]# zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower

zookeeper简单操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#连接到任意节点生成数据:
[root@linux-host3 data]# /usr/local/zookeeper/bin/zkCli.sh -server 192.168.15.211:2181
WatchedEvent state:SyncConnected type:None path:null
create /test "hello"
Created /test
[zk: 192.168.15.211:2181(CONNECTED) 1]

#在其他zookeeper节点验证数据:
[root@linux-host2 src]# /usr/local/zookeeper/bin/zkCli.sh -server 192.168.15.212:2181
[zk: 192.168.15.212:2181(CONNECTED) 0] get /test
hello
cZxid = 0x100000004
ctime = Fri Dec 15 11:14:07 CST 2017
mZxid = 0x100000004
mtime = Fri Dec 15 11:14:07 CST 2017
pZxid = 0x100000004
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0

安装测试kafka

Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker

Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)

Partition
parition是物理上的概念,每个topic包含一个或多个partition,创建topic时可指定parition数量。每个partition对应于一个文件夹,该文件夹下存储该partition的数据和索引文件

Producer
负责发布消息到Kafka broker

Consumer
消费消息。每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。使用consumer high level API时,同一topic的一条消息只能被同一个consumer group内的一个consumer消费,但多个consumer group可同时消费这一消息。

在三台服务器上分别安装启动kafka

1
2
3
4
5
6
7
[root@linux-host1 src]# tar xvf kafka_2.11-1.0.0.tgz
[root@linux-host1 src]# ln -sv /usr/local/src/kafka_2.11-1.0.0 /usr/local/kafka
[root@linux-host1 src]# vim /usr/local/kafka/config/server.properties
21 broker.id=1 #设置每个代理全局唯一的整数ID
31 listeners=PLAINTEXT://192.168.15.211:9092
103 log.retention.hours=24 #保留指定小时的日志内容
123 zookeeper.connect=192.168.15.211:2181,192.168.15.212:2181,192.168.15.213:2181 #所有的zookeeper地址

启动kafka

1
~]# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties  #以守护进程的方式启动

测试kafka

1
2
3
4
[root@localhost local]# jps
4789 Kafka
1383 QuorumPeerMain
5147 Jps

测试创建topic
创建名为logstashtest,partitions(分区)为3,replication(复制)为3的topic(主题):
在任意kafaka服务器操作:

1
[root@node01 local]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.34.100:2181,192.168.34.101:2181,192.168.34102:2181 --partitions 3 --replication-factor 3 --topic logstashtest

测试获取topic
可以在任意一台kafka服务器上进行测试

1
2
3
4
5
[root@localhost local]# kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.34.100:2181,192.168.34.101:2181,192.168.34.102:2181 --topic logstashtest
Topic:logstashtest PartitionCount:3 ReplicationFactor:3 Configs:
Topic: logstashtest Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Topic: logstashtest Partition: 1 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2
Topic: logstashtest Partition: 2 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3

状态说明:logstashtest有三个分区分别为0、1、2,分区0的leader是3(broker.id),分区0有三个副本,并且状态都为lsr(ln-sync,表示可以参加选举成为leader)。

删除指定topic

1
2
3
[root@localhost local]# kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.34.100:2181,192.168.34.101:2181,192.168.34102:2181 --topic logstashtest
Topic logstashtest is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

获取所有topic

1
[root@localhost local]# kafka/bin/kafka-topics.sh --list --zookeeper 192.168.34.100:2181,192.168.34.101:2181,192.168.34.102:2181

kafka命令测试消息发送

创建topic

1
2
[root@localhost local]# kafka/bin/kafka-topics.sh --create --zookeeper 192.168.34.100:2181,192.168.34.101:2181,192.168.34.102:2181 --partitions 3 --replication-factor 3 --topic messagetest
Created topic "messagetest".

发送消息

1
2
3
4
5
6
7
8
[root@localhost local]# kafka/bin/kafka-console-producer.sh --broker-list 192.168.34.100:9092,192.168.34.101:9092,192.168.34.102:9092 --topic messagetest
>hello
>kafka
>logstash
>ss
>oo
>start
>

在其他kafka服务器测试数据获取

1
[root@node01 local]# kafka/bin/kafka-console-consumer.sh  --bootstrap-server 192.168.34.100:2181,192.168.34.101:2181,192.168.34.102:2181 --topic messagetest --from-beginning

使用logstash配置文件

编辑logstash配置文件

1
2