部署kafka集群
在3台主机上部署kafka_2.11-1.1.0集群。
软件版本
JDK 1.8 kafka_2.11-1.1.0
kafka简介
Kafka是一个开源的分布式消息引擎/消息中间件,同时Kafka也是一个流处理平台。Kakfa支持以发布/订阅的方式在应用间传递消息,同时并基于消息功能添加了Kafka Connect、Kafka Streams以支持连接其他系统的数据(Elasticsearch、Hadoop等) Kafka最核心的最成熟的还是他的消息引擎,所以Kafka大部分应用场景还是用来作为消息队列削峰平谷。另外,Kafka也是目前性能最好的消息中间件。
Kafka架构
在Kafka集群(Cluster)中,一个Kafka节点就是一个Broker,消息由Topic来承载,可以存储在1个或多个Partition中。发布消息的应用为Producer、消费消息的应用为Consumer,多个Consumer可以促成Consumer Group共同消费一个Topic中的消息。
| 概念/对象 | 简单说明 |
|---|---|
| Broker | Kafka节点 |
| Topic | 主题,用来承载消息 |
| Partition | 分区,用于主题分片存储 |
| Producer | 生产者,向主题发布消息的应用 |
| Consumer | 消费者,从主题订阅消息的应用 |
| Consumer Group | 消费者组,由多个消费者组成 |
准备工作
- Kafka服务器 准备3台CentOS服务器,并配置好静态IP、主机名(这里复用了zk和redis的节点)
| 服务器名 | IP | 说明 |
|---|---|---|
| redis-node3 | 10.1.241.201 | Kafka节点1 |
| zk02 | 10.1.241.202 | Kafka节点2 |
| zk03 | 10.1.241.203 | Kafka节点3 |
- ZooKeeper集群 Kakfa集群需要依赖ZooKeeper存储Broker、Topic等信息,这里我们部署三台ZK
| 服务器名 | IP | 说明 |
|---|---|---|
| redis-node3 | 10.1.241.201 | ZooKeeper节点1 |
| zk02 | 10.1.241.202 | ZooKeeper节点2 |
| zk03 | 10.1.241.203 | ZooKeeper节点3 |
部署过程
创建应用&数据目录
#创建应用目录
mkdir /usr/kafka
#创建Kafka数据目录
mkdir -p /data/kafka/logs
chmod 777 -R /data/kafka
下载&解压
cd /home/puaiuc
wget https://archive.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
tar -xzvf kafka_2.11-1.1.0.tgz -C /usr/kafka
Kafka节点配置
#进入应用目录
cd /usr/kafka/kafka_2.11-1.1.0/
#修改配置文件
vi config/server.properties
- 通用配置
配置日志目录、指定ZooKeeper服务器
配置日志目录、指定ZooKeeper服务器
#A comma separated list of directories under which to store log files
log.dirs=/data/kafka/logs
#root directory for all kafka znodes.
zookeeper.connect=10.1.241.201:2181,10.1.241.202:2181,10.1.241.203:2181
- 分节点配置
redis-node3
broker.id=0
listeners=PLAINTEXT://10.1.241.201:9092
zk02
broker.id=1
listeners=PLAINTEXT://10.1.241.202:9092
zk03
broker.id=2
listeners=PLAINTEXT://10.1.241.203:9092
防火墙配置
sudo /sbin/iptables -I INPUT -p tcp --dport 9092 -j ACCEPT
启动Kafka
cd /usr/kafka/kafka_2.11-1.1.0/
/bin/kafka-server-start.sh config/server.properties &
#启动成功后日志显示
[2021-04-22 14:40:26,292] INFO Kafka version : 1.1.0 (org.apache.kafka.common.utils.AppInfoParser)
[2021-04-22 14:40:26,292] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
[2021-04-22 14:40:26,294] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
Kafka测试
创建Topic
在redis-node3(Broker)上创建测试Tpoic:test-ken-io,这里我们指定了3个副本、1个分区:
bin/kafka-topics.sh --create --zookeeper 10.1.241.201:2181,10.1.241.202:2181,10.1.241.203:2181 --replication-factor 3 --partitions 1 --topic test
Topic在redis-node3上创建后也会同步到集群中另外两个Broker:zk02、zk03
查看Topic
bin/kafka-topics.sh --list --zookeeper 10.1.241.201:2181
bin/kafka-topics.sh --list --zookeeper 10.1.241.202:2181
bin/kafka-topics.sh --list --zookeeper 10.1.241.203:2181
发送消息
bin/kafka-console-producer.sh --broker-list 10.1.241.202:9092 --topic test
#消息内容
>test by ken.io
消费消息
bin/kafka-console-consumer.sh --bootstrap-server 10.1.241.201:9092 --topic test --from-beginning
bin/kafka-console-consumer.sh --bootstrap-server 10.1.241.203:9092 --topic test --from-beginning
然后均能收到消息test by ken.io
Kafka常用Broker配置说明
| 配置项 | 默认值/示例值 | 说明 |
|---|---|---|
| broker.id | 0 | Broker唯一标识 |
| listeners | PLAINTEXT://192.168.88.53:9092 | 监听信息,PLAINTEXT表示明文传输 |
| log.dirs | kafka/logs | kafka数据存放地址,可以填写多个。用”,”间隔 |
| message.max.bytes | message.max.bytes | 单个消息长度限制,单位是字节 |
| num.partitions | 1 | 默认分区数 |
| log.flush.interval.messages | Long.MaxValue | 在数据被写入到硬盘和消费者可用前最大累积的消息的数量 |
| log.flush.interval.ms | Long.MaxValue | 在数据被写入到硬盘前的最大时间 |
| log.flush.scheduler.interval.ms | Long.MaxValue | 检查数据是否要写入到硬盘的时间间隔。 |
| log.retention.hours | 24 | 控制一个log保留时间,单位:小时 |
| zookeeper.connect | 192.168.88.21:2181 | ZooKeeper服务器地址,多台用”,”间隔 |
2021-04-22