Skip to content

keep yourself

部署kafka集群

在3台主机上部署kafka_2.11-1.1.0集群。

软件版本

JDK 1.8 kafka_2.11-1.1.0

kafka简介

Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。Kakfa支持以发布/订阅的方式在应用间传递消息,同时并基于消息功能添加了Kafka Connect、Kafka Streams以支持连接其他系统的数据(Elasticsearch、Hadoop等) Kafka最核心的最成熟的还是他的消息引擎,所以Kafka大部分应用场景还是用来作为消息队列削峰平谷。另外,Kafka也是目前性能最好的消息中间件。

Kafka架构

Kafka架构图 在Kafka集群(Cluster)中,一个Kafka节点就是一个Broker,消息由Topic来承载,可以存储在1个或多个Partition中。发布消息的应用为Producer、消费消息的应用为Consumer,多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。

概念/对象简单说明
BrokerKafka节点
Topic主题,用来承载消息
Partition分区,用于主题分片存储
Producer生产者,向主题发布消息的应用
Consumer消费者,从主题订阅消息的应用
Consumer Group消费者组,由多个消费者组成

准备工作

  • Kafka服务器 准备3台CentOS服务器,并配置好静态IP、主机名(这里复用了zk和redis的节点)
服务器名IP说明
redis-node310.1.241.201Kafka节点1
zk0210.1.241.202Kafka节点2
zk0310.1.241.203Kafka节点3
  • ZooKeeper集群 Kakfa集群需要依赖ZooKeeper存储Broker、Topic等信息,这里我们部署三台ZK
服务器名IP说明
redis-node310.1.241.201ZooKeeper节点1
zk0210.1.241.202ZooKeeper节点2
zk0310.1.241.203ZooKeeper节点3

部署过程

创建应用&数据目录

#创建应用目录
mkdir /usr/kafka
#创建Kafka数据目录
mkdir -p /data/kafka/logs
chmod 777 -R /data/kafka

下载&解压

kafka下载地址

cd /home/puaiuc
wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
tar -xzvf kafka_2.11-1.1.0.tgz -C /usr/kafka

Kafka节点配置

#进入应用目录
cd /usr/kafka/kafka_2.11-1.1.0/
#修改配置文件
vi config/server.properties
  • 通用配置

配置日志目录、指定ZooKeeper服务器

配置日志目录、指定ZooKeeper服务器
#A comma separated list of directories under which to store log files
log.dirs=/data/kafka/logs
#root directory for all kafka znodes.
zookeeper.connect=10.1.241.201:2181,10.1.241.202:2181,10.1.241.203:2181
  • 分节点配置

redis-node3

broker.id=0
listeners=PLAINTEXT://10.1.241.201:9092

zk02

broker.id=1
listeners=PLAINTEXT://10.1.241.202:9092

zk03

broker.id=2
listeners=PLAINTEXT://10.1.241.203:9092

防火墙配置

sudo /sbin/iptables -I INPUT -p tcp --dport 9092 -j ACCEPT

启动Kafka

cd /usr/kafka/kafka_2.11-1.1.0/
/bin/kafka-server-start.sh config/server.properties &
#启动成功后日志显示
[2021-04-22 14:40:26,292] INFO Kafka version : 1.1.0 (org.apache.kafka.common.utils.AppInfoParser)
[2021-04-22 14:40:26,292] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
[2021-04-22 14:40:26,294] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

Kafka测试

创建Topic

在redis-node3(Broker)上创建测试Tpoic:test-ken-io,这里我们指定了3个副本、1个分区:

bin/kafka-topics.sh --create --zookeeper 10.1.241.201:2181,10.1.241.202:2181,10.1.241.203:2181 --replication-factor 3 --partitions 1 --topic test

Topic在redis-node3上创建后也会同步到集群中另外两个Broker:zk02、zk03

查看Topic

bin/kafka-topics.sh --list --zookeeper 10.1.241.201:2181
bin/kafka-topics.sh --list --zookeeper 10.1.241.202:2181
bin/kafka-topics.sh --list --zookeeper 10.1.241.203:2181

发送消息

bin/kafka-console-producer.sh --broker-list  10.1.241.202:9092  --topic  test
#消息内容
>test by ken.io

消费消息

bin/kafka-console-consumer.sh --bootstrap-server 10.1.241.201:9092 --topic test --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server 10.1.241.203:9092 --topic test --from-beginning

然后均能收到消息test by ken.io

Kafka常用Broker配置说明

配置项默认值/示例值说明
broker.id0Broker唯一标识
listenersPLAINTEXT://192.168.88.53:9092监听信息,PLAINTEXT表示明文传输
log.dirskafka/logskafka数据存放地址,可以填写多个。用”,”间隔
message.max.bytesmessage.max.bytes单个消息长度限制,单位是字节
num.partitions1默认分区数
log.flush.interval.messagesLong.MaxValue在数据被写入到硬盘和消费者可用前最大累积的消息的数量
log.flush.interval.msLong.MaxValue在数据被写入到硬盘前的最大时间
log.flush.scheduler.interval.msLong.MaxValue检查数据是否要写入到硬盘的时间间隔。
log.retention.hours24控制一个log保留时间,单位:小时
zookeeper.connect192.168.88.21:2181ZooKeeper服务器地址,多台用”,”间隔