Kafka集群搭建

一、kafka安装说明

1.1 kafka介绍

Kafka 是一个分布式的流处理平台。一个流处理平台通常有以下特点 :

  • 发布和订阅消息流(类似于消息队列或者企业级的消息系统)
  • 以容错的、持久的方式存储消息流
  • 当消息流到来的时候,处理消息
1.2 kafka应用
  • 数据推送
  • 作为大缓冲区使用
  • 日志采集
  • 服务中间件
1.3 kafka集群

  • Kafka集群依赖于Zookeeper进行协调,并且在早期的Kafka版本中很多数据都是存放在Zookeeper的
  • Kafka节点只要注册到同一个Zookeeper上就代表它们是同一个集群的
  • Kafka通过brokerId来区分集群中的不同节点
  • Kafka是基于Zookeeper来实现分布式协调的,所以在搭建Kafka节点之前需要先搭建好Zookeeper节点;
  • Zookeeper和Kafka都依赖于JDK环境
1.3 kafka角色
  • Broker:一般指Kafka的部署节点
  • Leader:用于处理消息的接收和消费等请求,也就是说producer是将消息push到leader,而consumer也是从leader上去poll消息
  • Follower:主要用于备份消息数据,一个leader会有多个follower

二、Kafka集群搭建

2.1 节点规划
主机名 IP 组件
node1 10.0.0.11 kafka1,zk1,java
node2 10.0.0.12 kafka2,zk2,java
node3 10.0.0.13 kafka2,zk2,java
2.2 安装jdk
cat >> /etc/hosts <<eof
10.0.0.11 node1
10.0.0.12 node2
10.0.0.13 node3
eof
[root@node1 tempfile]# java -version
java version "1.8.0_241"
Java(TM) SE Runtime Environment (build 1.8.0_241-b07)
Java HotSpot(TM) 64-Bit Server VM (build 25.241-b07, mixed mode)
2.3 解压安装
wget -c https://mirrors.bfsu.edu.cn/apache/kafka/2.8.0/kafka_2.12-2.8.0.tgz
tar xf kafka_2.12-2.8.0.tgz -C /data/software/
echo "export PATH=/data/software/kafka_2.12-2.8.0/bin:\$PATH" >> /etc/profile
source /etc/profile
# 创建zk数据和日志目录
mkdir -p /data/software/kafka_2.12-2.8.0/data/{zk,kafka}
mkdir -p /data/software/kafka_2.12-2.8.0/logs/{kafka,zk}
2.4 zk配置
# kafka的二进制包里面已经内置了zk组件,所以直接修改kafka中的zk配置即可
vim /data/software/kafka_2.12-2.8.0/config/zookeeper.properties
# 所以节点上的zk配置一致即可,这里做集群是为了保证zk的高可用
dataDir=/data/software/kafka_2.12-2.8.0/data/zk
dataLogDir=/data/software/kafka_2.12-2.8.0/logs/zk
clientPort=2181
maxClientCnxns=0
tickTime=2000
initLimit=10
syncLimit=5
admin.enableServer=false
server.1=10.0.0.11:2881:3881
server.2=10.0.0.12:2881:3881
server.3=10.0.0.13:2881:3881
# 定义节点id值,每个节点配置的id值与server.id中的一致即可;
# 在每个zookeeper的 data 目录下创建一个myid文件;
echo 1 > /data/software/kafka_2.12-2.8.0/data/zk/myid # node1配置
echo 2 > /data/software/kafka_2.12-2.8.0/data/zk/myid # node2配置
echo 3 > /data/software/kafka_2.12-2.8.0/data/zk/myid # node3配置
chmod 600 /data/software/kafka_2.12-2.8.0/data/zk/myid
# 后台启动zk集群服务
zookeeper-server-start.sh -daemon /data/software/kafka_2.12-2.8.0/config/zookeeper.properties

# 需要停止zk服务执行
zookeeper-server-stop.sh /data/software/kafka_2.12-2.8.0/config/zookeeper.properties
2.5 kafka配置
vim /data/software/kafka_2.12-2.8.0/config/server.properties
# node1节点配置,注意broker的id值是唯一的
#broker 的全局唯一编号,不能重复
broker.id=0
#处理网络请求的线程数量
num.network.threads=3
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志存放的路径
log.dirs=/data/software/kafka_2.12-2.8.0/logs/kafka
#topic 在当前 broker 上的分区个数
num.partitions=2
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
#配置连接 Zookeeper 集群地址
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
#开启 删除 topic 功能
delete.topic.enable=true
#绑定节点的名称,值为当前节点
host.name=node1
port=9092
# node2节点配置
broker.id=1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/software/kafka_2.12-2.8.0/logs/kafka
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
host.name=node2
port=9092
# node3节点配置
broker.id=2
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/software/kafka_2.12-2.8.0/logs/kafka
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node1:2181,node2:2181,node3:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
host.name=node3
port=9092
2.6 启动kafka
# 后台运行kafka实例
kafka-server-start.sh -daemon /data/software/kafka_2.12-2.8.0/config/server.properties
jps

# 如果需要停止执行
kafka-server-stop.sh /data/software/kafka_2.12-2.8.0/config/server.properties
2.7 验证集群
# 创建topic为test的主题
kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181 --replication-factor 3 --partitions 3 --topic test

# 查看topic列表
kafka-topics.sh --list --zookeeper node1:2181
kafka-topics.sh --list --zookeeper node2:2181
kafka-topics.sh --list --zookeeper node3:2181

# 查看指定主题
kafka-topics.sh --describe --zookeeper node1:2181 --topic test

# 模拟生产者
kafka-console-producer.sh --broker-list node1:9092 --topic test

# 模拟消费者(新版本写法)
kafka-console-consumer.sh --bootstrap-server node2:9092 --topic test  --from-beginning