安装部署
集群规划如下,由于之前安装过ZooKeeper了,这里主要是安装Kafka
hadoop100 | hadoop101 | hadoop102 |
---|---|---|
zk | zk | zk |
kafka | kafka | kafka |
上传安装包
从官网下载安装包,放置到hadoop100的/opt/software下
cd /opt/software
tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/
cd /opt/module/
mv kafka_2.12-3.0.0/ kafka-3.0.0
配置文件
配置环境变量
sudo vim /etc/profile
# 添加如下
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka-3.0.0
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
修改配置文件
cd $KAFKA_HOME/config
vim server.properties
# 添加以下内容
#broker 的全局唯一编号,不能重复,只能是数字
broker.id=0
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志(数据)存放的路径,路径不需要提前创建
#kafka 自动帮你创建,可以配置多个磁盘路径
#路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka-3.0.0/datas
#topic 在当前 broker 上的分区个数
num.partitions=3
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#每个 topic 创建时的副本数,默认时 1 个副本
offsets.topic.replication.factor=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个 segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824
#检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接Zookeeper 集群地址
#在 zk 根目录下创建/kafka,方便管理
zookeeper.connect=hadoop100:2181,hadoop101:2181,hadoop102:2181/kafka
分发安装包
xsync /opt/module/kafka-3.0.0
然后分别在 hadoop101 和 hadoop102 上修改配置文件/opt/module/kafka-3.0.0/config/server.properties中的 broker.id=1、broker.id=2
broker.id 不得重复,整个集群中唯一
最后hadoop101 和 hadoop102都配置上Kafka的环境变量(操作如上)
启动集群
# 先启动 Zookeeper 集群,然后启动 Kafka(用的是之前部署ZooKeeper时创建的集群启动脚本)
zk.sh start
# 启动集群:依次在 hadoop100、hadoop101、hadoop102 节点上启动 Kafka
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
# 关闭集群:依次在 hadoop100、hadoop101、hadoop102 节点上关闭Kafka
kafka-server-stop.sh
集群启停脚本
vim kf.sh
# 添加如下
#!/bin/bash
case $1 in
"start"){
for i in hadoop100 hadoop101 hadoop102
do
echo " --------启动 $i Kafka-------"
ssh $i "/opt/module/kafka-3.0.0/bin/kafka-server-start.sh -daemon /opt/module/kafka-3.0.0/config/server.properties"
done
};;
"stop"){
for i in hadoop100 hadoop101 hadoop102
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/module/kafka-3.0.0/bin/kafka-server-stop.sh "
done
};;
esac
启动/停止集群命令
kf.sh start
kf.sh stop
注意: 停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。
启动失败(JAVA_HOME找不到)
如果脚本启动出现问题(经常是ssh方式去执行命令找不到JAVA_HOME)。类似的问题也出现在集群脚本启动Hadoop、Spark、ZooKeeper时,都可以用如下方法解决。
可以配置下.bashrc文件(这个文件在每次shell连接时都会加载一次,ssh执行命令也会加载)
vim ~/.bashrc
# 添加如下
export JAVA_HOME=/opt/module/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin
重置Kafka集群
# 删除三个节点的数据目录
rm -rf /opt/module/kafka-3.0.0/datas
# 进入ZooKeeper客户端
zkCli.sh
# 删除Kafka在ZooKeeper中的元数据
deleteall /kafka
Kafka 命令行操作
主题命令行操作
$KAFKA_HOME/bin/kafka-topics.sh
参数 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号。 |
–topic <String: topic> | 操作的 topic 名称。 |
–create | 创建主题。 |
–delete | 删除主题。 |
–alter | 修改主题。 |
–list | 查看所有主题。 |
–describe | 查看主题详细描述。 |
–partitions <Integer: # of partitions> | 设置分区数。 |
–replication-factor<Integer: replication factor> | 设置分区副本。 |
–config <String: name=value> | 更新系统默认的配置 |
查看当前服务器中的所有 topic
kafka-topics.sh --bootstrap-server hadoop100:9092 --list
创建 first topic
kafka-topics.sh --bootstrap-server hadoop100:9092 --create --partitions 1 --replication-factor 3 --topic first
- –topic 定义 topic 名
- –replication-factor 定义副本数
- –partitions 定义分区数
查看 first 主题的详情
kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic first
修改分区数(注意:分区数只能增加,不能减少)
kafka-topics.sh --bootstrap-server hadoop100:9092 --alter --topic first --partitions 3
再次查看 first 主题的详情
kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic first
删除 topic
kafka-topics.sh --bootstrap-server hadoop100:9092 --delete --topic first
生产者命令行操作
查看操作生产者命令参数
$KAFKA_HOME/bin/kafka-console-producer.sh
参数 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号。 |
–topic <String: topic> | 操作的 topic 名称。 |
**发送消息 **
kafka-console-producer.sh --bootstrap-server hadoop100:9092 --topic first
>hello world
>atguigu atguigu
消费者命令行操作
查看操作消费者命令参数
$KAFKA_HOME/bin/kafka-console-consumer.sh
参数 | 描述 |
---|---|
–bootstrap-server <String: server toconnect to> | 连接的 Kafka Broker 主机名称和端口号。 |
–topic <String: topic> | 操作的 topic 名称。 |
–from-beginning | 从头开始消费。 |
–group <String: consumer group id> | 指定消费者组名称。 |
消费消息
# 消费 first 主题中的数据。
kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic first
# 把主题中所有的数据都读取出来(包括历史数据)。
kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --from-beginning --topic first
Kafka 生产者
生产者消息发送流程
发送原理
在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。
生产者重要参数列表
参数名称 | 描述 |
---|---|
bootstrap.servers | 生产者连接集群所需的 broker 地 址 清 单 。 例 如hadoop100:9092,hadoop101:9092,hadoop102:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker里查找到其他 broker 信息。 |
key.serializer 和 value.serializer | 指定发送消息的 key 和 value 的序列化类型。一定要写全类名。 |
buffer.memory RecordAccumulator | 缓冲区总大小,默认 32m。 |
batch.size | 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 |
linger.ms | 如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。 |
acks | 0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,Leader 收到数据后应答。-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all 是等价的。 |
max.in.flight.requests.per.connection | 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。 |
retries | 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是 100ms。 |
enable.idempotence | 是否开启幂等性,默认 true,开启幂等性。 |
compression.type | 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。 |
异步发送 API
普通异步发送
需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker
代码编写
导入依赖
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
</dependencies>
编写不带回调函数的 API 代码
package ltd.cmjava.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducer {
public static void main(String[] args) {
// 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");
// 指定对应的key和value的序列化类型 key.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 1 创建kafka生产者对象
// "" hello
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first","cm"+i));
}
// 3 关闭资源
kafkaProducer.close();
}
}
测试
在 hadoop100 上开启 Kafka 消费者。
kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic first
在 IDEA 中执行代码,观察 hadoop100 控制台中是否接收到消息
cm 0
cm 1
cm 2
cm 3
cm 4
带回调函数的异步发送
回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试
package ltd.cmjava.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerCallback {
public static void main(String[] args) throws InterruptedException {
// 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");
// 指定对应的key和value的序列化类型 key.serializer
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 1 创建kafka生产者对象
// "" hello
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2 发送数据
for (int i = 0; i < 500; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "cm" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null){
System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
}
}
});
Thread.sleep(2);
}
// 3 关闭资源
kafkaProducer.close();
}
}
测试
在 hadoop102 上开启 Kafka 消费者。
kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic first
在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。
cm 0
cm 1
cm 2
cm 3
cm 4
...
在 IDEA 控制台观察回调信息。
主题:first->分区:0
主题:first->分区:0
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1
同步发送 API
只需在异步发送的基础上,再调用一下 get()方法即可。
package ltd.cmjava.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class CustomProducerSync {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");
// 指定对应的key和value的序列化类型 key.serializer
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 1 创建kafka生产者对象
// "" hello
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first","cm"+i)).get();
}
// 3 关闭资源
kafkaProducer.close();
}
}
测试
在 hadoop100 上开启 Kafka 消费者。
kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic first
在 IDEA 中执行代码,观察 hadoop100 控制台中是否接收到消息
cm 0
cm 1
cm 2
cm 3
cm 4
生产者分区
分区好处
生产者发送消息的分区策略
默认的分区器 DefaultPartitioner
在 IDEA 中 ctrl +n,全局查找 DefaultPartitioner。
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a
partition based on a hash of the key
* <li>If no partition or key is present choose the sticky
partition that changes when the batch is full.
*
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {
… …
}
案例一
将数据发往指定 partition 的情况下,例如,将所有数据发往分区 1 中
package ltd.cmjava.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerCallbackPartitions01 {
public static void main(String[] args) throws InterruptedException {
// 配置
Properties properties = new Properties();
// 给 kafka 配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop100:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<>(properties);
for (int i = 0; i < 5; i++) {
// 指定数据发送到 0 号分区,key 为空(IDEA 中 ctrl + p 查看参数)
kafkaProducer.send(new ProducerRecord<>("first",
0, "", "cm " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata,
Exception e) {
if (e == null) {
System.out.println(" 主题: " +
metadata.topic() + "->" + "分区:" + metadata.partition()
);
} else {
e.printStackTrace();
}
}
});
}
kafkaProducer.close();
}
}
测试
在 hadoop100 上开启 Kafka 消费者。
kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic first
在 IDEA 中执行代码,观察 hadoop100 控制台中是否接收到消息。
cm 0
cm 1
cm 2
cm 3
cm 4
在 IDEA 控制台观察回调信息。
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
案例二
没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值
package ltd.cmjava.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerCallbackPartitions02 {
public static void main(String[] args) throws InterruptedException {
// 配置
Properties properties = new Properties();
// 给 kafka 配置对象添加配置信息
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop100:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
KafkaProducer<String, String> kafkaProducer = new
KafkaProducer<>(properties);
String[] keys = {"a", "b", "f"};
for (int i = 0; i < 3; i++) {
// 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,分别发往对应的分区
kafkaProducer.send(new ProducerRecord<>("first",
keys[i],"cm " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null){
System.out.println(" 主题: " +
metadata.topic() + "->" + "分区:" + metadata.partition()
);
}else {
e.printStackTrace();
}
}
});
}
kafkaProducer.close();
}
}
测试
key="a"时,在控制台查看结果
主题:first->分区:1
key="b"时,在控制台查看结果
主题:first->分区:2
key="f"时,在控制台查看结果
主题:first->分区:0
自定义分区器
如果研发人员可以根据企业需求,自己重新实现分区器。
需求: 例如我们实现一个分区器实现,发送过来的数据中如果包含 cm,就发往 0 号分区,不包含 cm,就发往 1 号分区。
定义类实现 Partitioner 接口,重写 partition()方法。
package ltd.cmjava.kafka.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取数据
String msgValues = value.toString();
int partition;
if (msgValues.contains("cm")){
partition = 0;
}else {
partition = 1;
}
return partition;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
在生产者的配置中添加分区器参数。
package ltd.cmjava.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerCallbackPartitions {
public static void main(String[] args) throws InterruptedException {
// 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");
// 指定对应的key和value的序列化类型 key.serializer
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 关联自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());
// 1 创建kafka生产者对象
// "" hello
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "","cm" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null){
System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
}
}
});
Thread.sleep(2);
}
// 3 关闭资源
kafkaProducer.close();
}
}
测试
在 hadoop100 上开启 Kafka 消费者。
kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic first
在 IDEA 控制台观察回调信息。
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
生产经验——生产者如何提高吞吐量
package ltd.cmjava.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerParameters {
public static void main(String[] args) {
// 配置
Properties properties = new Properties();
// 连接kafka集群
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");
// 序列化
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 缓冲区大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
// 批次大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
// linger.ms
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
// 压缩
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
// 1 创建生产者
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first","cm"+i));
}
// 3 关闭资源
kafkaProducer.close();
}
}
测试
在 hadoop100 上开启 Kafka 消费者。
kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic first
在 IDEA 中执行代码,观察 hadoop100 控制台中是否接收到消息。
cm 0
cm 1
cm 2
cm 3
cm 4
生产经验——数据可靠性
回顾发送流程
ack 应答原理
package ltd.cmjava.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerAcks {
public static void main(String[] args) {
// 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");
// 指定对应的key和value的序列化类型 key.serializer
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// acks
properties.put(ProducerConfig.ACKS_CONFIG,"1");
// 重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,3);
// 1 创建kafka生产者对象
// "" hello
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first","cm"+i));
}
// 3 关闭资源
kafkaProducer.close();
}
}
生产经验——数据去重
数据传递语义
幂等性
幂等性原理
如何使用幂等性
开启参数 enable.idempotence 默认为 true,false 关闭。
生产者事务
Kafka 事务原理
Kafka 的事务一共有如下 5 个 API
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;
单个 Producer,使用事务保证消息的仅一次发送
package ltd.cmjava.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerTranactions {
public static void main(String[] args) {
// 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop100:9092,hadoop101:9092");
// 指定对应的key和value的序列化类型 key.serializer
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 指定事务id
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranactional_id_01");
// 1 创建kafka生产者对象
// "" hello
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
try {
// 2 发送数据
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "cm" + i));
}
// 人造一个错误,看看控制台是否提交数据
int i = 1 / 0;
kafkaProducer.commitTransaction();
} catch (Exception e) {
kafkaProducer.abortTransaction();
} finally {
// 3 关闭资源
kafkaProducer.close();
}
}
}
测试
在 hadoop100 上开启 Kafka 消费,看看注释int i = 1 / 0;
与否,控制台是否打印信息
kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic first
生产经验——数据有序
生产经验——数据乱序
Kafka Broker
Kafka Broker 工作流程
Zookeeper 存储的 Kafka 信息
zkCli.sh
# 进入到zk客户端,Kafka储存在zk的信息在/kafka下
ls /kafka
Kafka Broker 总体工作流程
模拟 Kafka 上下线,Zookeeper 中数据变化
# 进入到zk客户端
# 查看/kafka/brokers/ids 路径上的节点。
ls /kafka/brokers/ids
# 执行正常显示
[0, 1, 2]
# 查看/kafka/controller 路径上的数据。
get /kafka/controller
# 执行正常显示
{"version":1,"brokerid":0,"timestamp":"1637292471777"}
# 查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据
get /kafka/brokers/topics/first/partitions/0/state
# 执行正常显示
{"controller_epoch":24,"leader":0,"version":1,"leader_epoch":18,"isr":[0,1,2]}
# 停止 hadoop102 上的 kafka(在hadoop102的linux环境下执行)
kafka-server-stop.sh
# 再次查看/kafka/brokers/ids 路径上的节点。
ls /kafka/brokers/ids
# 执行正常显示
[0, 1]
# 再次查看/kafka/controller 路径上的数据。
get /kafka/controller
# 执行正常显示
{"version":1,"brokerid":0,"timestamp":"1637292471777"}
# 再次查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据。
get /kafka/brokers/topics/first/partitions/0/state
# 执行正常显示{"controller_epoch":24,"leader":0,"version":1,"leader_epoch":18,"isr":[0,1]}
# 启动 hadoop102 上的 kafka。
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
# 再次观察/kafka的内容。
Broker 重要参数
参数名称 | 描述 |
---|---|
replica.lag.time.max.ms | ISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值,默认 30s。 |
auto.leader.rebalance.enable | 默认是 true。 自动 Leader Partition 平衡。 |
eader.imbalance.per.broker.percentage | 默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。 |
leader.imbalance.check.interval.seconds | 默认值 300 秒。检查 leader 负载是否平衡的间隔时 |
间。 | |
log.segment.bytes | Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小,默认值 1G。 |
log.index.interval.bytes | 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 |
log.retention.hours | Kafka 中数据保存的时间,默认 7 天。 |
log.retention.minutes | Kafka 中数据保存的时间,分钟级别,默认关闭。 |
log.retention.ms | Kafka 中数据保存的时间,毫秒级别,默认关闭。 |
log.retention.check.interval.ms | 检查数据是否保存超时的间隔,默认是 5 分钟。 |
log.retention.bytes | 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment。 |
log.cleanup.policy | 默认是 delete,表示所有数据启用删除策略;如果设置值为 compact,表示所有数据启用压缩策略。 |
num.io.threads | 默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。 num.replica.fetchers 副本拉取线程数,这个参数占总核数的 50%的 1/3 |
num.network.threads | 默认是 3。数据传输线程数,这个参数占总核数的50%的 2/3 。 |
log.flush.interval.messages | 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。 |
log.flush.interval.ms | 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。 |
生产经验——节点服役和退役
服役新节点
新节点准备
关闭 hadoop102,并右键执行克隆操作。并设置主机名 hadoop103,并修改 IP 地址。
vim /etc/sysconfig/network-scripts/ifcfg-ens33
# 就改了IPADDR这一出的Ip地址
# 完整内容如下
TYPE="Ethernet"
PROXY_METHOD="none"
BROWSER_ONLY="no"
BOOTPROTO="static"
DEFROUTE="yes"
IPV4_FAILURE_FATAL="no"
IPV6INIT="yes"
IPV6_AUTOCONF="yes"
IPV6_DEFROUTE="yes"
IPV6_FAILURE_FATAL="no"
IPV6_ADDR_GEN_MODE="stable-privacy"
NAME="ens33"
UUID="3fc01f0a-bd12-4b1b-85e1-a5785b26ac55"
DEVICE="ens33"
ONBOOT="yes"
IPADDR=192.168.217.103
GATEWAY=192.168.217.2
DNS1=8.8.8.8
DNS2=114.114.114.114
# 修改主机名
vim /etc/hostname
# 完整内容如下
hadoop103
# VmWare上重启虚拟机hadoop102和hadoop103
修改 haodoop105 中 kafka 的 broker.id 为 3
vim $KAFKA_HOME/config/server.properties
# 修改如下
broker.id=3
删除 hadoop105 中 kafka 下的 datas 和 logs
rm -rf $KAFKA_HOME/datas/* $KAFKA_HOME/logs/*
启动 hadoop100、hadoop101、hadoop102 上的 kafka 集群
# 使用集群启停脚本
zk.sh start
kf.sh start
单独启动 hadoop103 中的 kafka。
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
执行负载均衡操作
在hadoop100下,创建一个要均衡的主题
vim $KAFKA_HOME/topics-to-move.json
# 完整内容如下
{
"topics": [
{"topic": "first"}
],
"version": 1
}
生成一个负载均衡的计划。
kafka-reassign-partitions.sh --bootstrap-server hadoop100:9092 --topics-to-move-json-file $KAFKA_HOME/topics-to-move.json --broker-list "0,1,2,3" --generate
# 执行正常显示
Current partition replica assignment
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)。
vim $KAFKA_HOME/increase-replication-factor.json
# 完整内容如下
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}
执行副本存储计划。
kafka-reassign-partitions.sh --bootstrap-server hadoop100:9092 --reassignment-json-file $KAFKA_HOME/increase-replication-factor.json --execute
验证副本存储计划。
kafka-reassign-partitions.sh --bootstrap-server hadoop100:9092 --reassignment-json-file $KAFKA_HOME/increase-replication-factor.json --verify
# 执行正常显示
Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.
Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic first
退役旧节点
执行负载均衡操作
先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。
创建一个要均衡的主题。
vim $KAFKA_HOME/topics-to-move.json
# 完整内容如下
{
"topics": [
{"topic": "first"}
],
"version": 1
}
创建执行计划。
kafka-reassign-partitions.sh --bootstrap-server hadoop100:9092 --topics-to-move-json-file $KAFKA_HOME/topics-to-move.json --broker-list "0,1,2" --generate
# 执行正常显示
Current partition replica assignment
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,2,3],"log_dirs":["any","any","any"]}]}
Proposed partition reassignment configuration{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]}]}
创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中)
vim $KAFKA_HOME/increase-replication-factor.json
# 完整内容如下
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]}]}
执行副本存储计划。
kafka-reassign-partitions.sh --bootstrap-server hadoop100:9092 --reassignment-json-file $KAFKA_HOME/increase-replication-factor.json --execute --additional
验证副本存储计划。(出现报错java.net.UnknownHostException: hadoop103,在hadoop100上配一下/etc/hosts就行了,不用source)
kafka-reassign-partitions.sh --bootstrap-server hadoop100:9092 --reassignment-json-file $KAFKA_HOME/increase-replication-factor.json --verify
# 执行正常显示
Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.
Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic first
执行停止命令
在 hadoop103 上执行停止命令即可。
kafka-server-stop.sh
Kafka 副本
副本基本信息
- Kafka 副本作用:提高数据可靠性。
- Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
- Kafka 中副本分为:Leader 和 Follower。Kafka 生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据。
- Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。AR = ISR + OSR
- ISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的Leader。
- OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本
Leader 选举流程
Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。Controller 的信息同步工作是依赖于 Zookeeper 的。
创建一个新的 topic,4 个分区,4 个副本
把上述的hadoop103节点的Kafka启动起来,然后执行
kafka-topics.sh --bootstrap-server hadoop100:9092 --create --topic cm1 --partitions 4 --replication-factor 4
# 执行正常显示
Created topic cm1.
查看 Leader 分布情况
kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic cm1
# 执行正常显示
Topic: cm1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: cm1 Partition: 0 Leader: 3 Replicas: 3,0,2,1 Isr: 3,0,2,1
Topic: cm1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
Topic: cm1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,1,2
Topic: cm1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0,3
停止掉 hadoop103 的 kafka 进程,并查看 Leader 分区情况
# hadoop103处执行
kafka-server-stop.sh
# hadoop100处执行
kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic cm1
# 执行正常显示
Topic: cm1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: cm1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,2,1
Topic: cm1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,0
Topic: cm1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,2
Topic: cm1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0
停止掉 hadoop102 的 kafka 进程,并查看 Leader 分区情况
# hadoop102处执行
kafka-server-stop.sh
# hadoop100处执行
kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic cm1
# 执行正常显示
Topic: cm1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: cm1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1
Topic: cm1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0
Topic: cm1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1
Topic: cm1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0
启动 hadoop103 的 kafka 进程,并查看 Leader 分区情况
# hadoop103处执行
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
# hadoop100处执行
kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic cm1
# 执行正常显示
Topic: cm1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: cm1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3
Topic: cm1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3
Topic: cm1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3
Topic: cm1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3
启动 hadoop102 的 kafka 进程,并查看 Leader 分区情况
# hadoop102处执行
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
# hadoop100处执行
kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic cm1
# 执行正常显示
Topic: cm1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: cm1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3,2
Topic: cm1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3,2
Topic: cm1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3,2
Topic: cm1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3,2
停止掉 hadoop101 的 kafka 进程,并查看 Leader 分区情况
# hadoop101处执行
kafka-server-stop.sh
# hadoop100处执行
kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic cm1
# 执行正常显示
Topic: cm1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: cm1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,3,2
Topic: cm1 Partition: 1 Leader: 2 Replicas: 1,2,3,0 Isr: 0,3,2
Topic: cm1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,2
Topic: cm1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 0,3,2
Leader 和 Follower 故障处理细节
分区副本分配
如果 kafka 服务器只有 4 个节点,那么设置 kafka 的分区数大于服务器台数,在 kafka底层如何分配存储副本呢?
创建 16 分区,3 个副本
创建一个新的 topic,名称为 second。
kafka-topics.sh --bootstrap-server hadoop100:9092 --create --partitions 16 --replication-factor 3 --topic second
查看分区和副本情况。
kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic second
# 执行正常显示
Topic: second4 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 2 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 3 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
Topic: second4 Partition: 4 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
Topic: second4 Partition: 5 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0
Topic: second4 Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: second4 Partition: 7 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: second4 Partition: 8 Leader: 0 Replicas: 0,3,1 Isr: 0,3,1
Topic: second4 Partition: 9 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: second4 Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: second4 Partition: 11 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
Topic: second4 Partition: 12 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 14 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 15 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
生产经验——手动调整分区副本存储
手动调整分区副本存储的步骤如下:
创建一个新的 topic,名称为 three。
kafka-topics.sh --bootstrap-server hadoop100:9092 --create --partitions 4 --replication-factor 2 --topic three
查看分区副本存储情况。
kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic three
创建副本存储计划(所有副本都指定存储在 broker0、broker1 中)。
vim $KAFKA_HOME/increase-replication-factor.json
# 完整内容如下
{"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[0,1]},{"topic":"three","partition":1,"replicas":[0,1]},{"topic":"three","partition":2,"replicas":[1,0]},{"topic":"three","partition":3,"replicas":[1,0]}] }
执行副本存储计划。
kafka-reassign-partitions.sh --bootstrap-server hadoop100:9092 --reassignment-json-file $KAFKA_HOME/increase-replication-factor.json --execute
验证副本存储计划。
kafka-reassign-partitions.sh --bootstrap-server hadoop100:9092 --reassignment-json-file $KAFKA_HOME/increase-replication-factor.json --verify
查看分区副本存储情况。
kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic three
生产经验——Leader Partition 负载平衡
参数名称 | 描述 |
---|---|
auto.leader.rebalance.enable | 默认是 true。 自动 Leader Partition 平衡。生产环境中,leader 重选举的代价比较大,可能会带来性能影响,建议设置为 false 关闭。 |
leader.imbalance.per.broker.percentage | 默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。 |
leader.imbalance.check.interval.seconds | 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。 |
生产经验——增加副本因子
在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。
创建 topic
kafka-topics.sh --bootstrap-server hadoop100:9092 --create --partitions 3 --replication-factor 1 --topic four
手动增加副本存储
创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中)
vim $KAFKA_HOME/increase-replication-factor.json
# 完整内容如下
{"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"topic":"four","partition":2,"replicas":[0,1,2]}]}
执行副本存储计划。
kafka-reassign-partitions.sh --bootstrap-server hadoop100:9092 --reassignment-json-file $KAFKA_HOME/increase-replication-factor.json --execute
文件存储
文件存储机制
Topic 数据的存储机制
思考:Topic 数据到底存储在什么位置?
启动生产者,并发送消息。
kafka-console-producer.sh --bootstrap-server hadoop100:9092 --topic first
>hello world
查看 hadoop102(或者 hadoop103、hadoop104)的/opt/module/kafka-3.0.0/datas/first-1 (first-0、first-2)路径上的文件。
ll /opt/module/kafka-3.0.0/datas/first-1
# 执行正常显示
-rw-rw-r-- 1 cm cm 10485760 2月 27 11:43 00000000000000000000.index
-rw-rw-r-- 1 cm cm 667 2月 27 11:43 00000000000000000000.log
-rw-rw-r-- 1 cm cm 10485756 2月 27 11:43 00000000000000000000.timeindex
-rw-rw-r-- 1 cm cm 13 2月 27 12:05 leader-epoch-checkpoint
-rw-rw-r-- 1 cm cm 43 2月 27 11:43 partition.metadata
直接查看 log 日志,发现是乱码。
cat /opt/module/kafka-3.0.0/datas/first-1/00000000000000000000.log
\CYnF|©|©ÿÿÿÿÿÿÿÿÿÿÿÿÿÿ"hello world
通过工具查看 index 和 log 信息。
cd /opt/module/kafka-3.0.0/datas/first-1
kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index
# 执行正常显示
offset: 0 position: 0
Mismatches in :/opt/module/kafka-3.0.0/datas/first-1/./00000000000000000000.index
Index offset: 0, log offset: 1
kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log
Dumping ./00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 1 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1677423438331 size: 81 magic: 2 compresscodec: none crc: 1132630391 isvalid: true
baseOffset: 2 lastOffset: 3 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 81 CreateTime: 1677423438340 size: 81 magic: 2 compresscodec: none crc: 4175274034 isvalid: true
baseOffset: 4 lastOffset: 4 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 162 CreateTime: 1677423438343 size: 71 magic: 2 compresscodec: none crc: 2382795792 isvalid: true
baseOffset: 5 lastOffset: 7 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 233 CreateTime: 1677423456391 size: 91 magic: 2 compresscodec: none crc: 1439649198 isvalid: true
baseOffset: 8 lastOffset: 9 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 324 CreateTime: 1677423456398 size: 81 magic: 2 compresscodec: none crc: 4229859629 isvalid: true
baseOffset: 10 lastOffset: 10 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 405 CreateTime: 1677423500141 size: 73 magic: 2 compresscodec: none crc: 1899802602 isvalid: true
baseOffset: 11 lastOffset: 15 count: 5 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 5 isTransactional: false isControl: false position: 478 CreateTime: 1677464904621 size: 111 magic: 2 compresscodec: none crc: 2446558136 isvalid: true
baseOffset: 16 lastOffset: 16 count: 1 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 2 partitionLeaderEpoch: 5 isTransactional: true isControl: true position: 589 CreateTime: 1677465215911 size: 78 magic: 2 compresscodec: none crc: 253418711 isvalid: true
index 文件和 log 文件详解
日志存储参数配置
参数 | 描述 |
---|---|
log.segment.bytes | Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小,默认值 1G。 |
log.index.interval.bytes | 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 稀疏索引。 |
文件清理策略
Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。
- log.retention.hours,最低优先级小时,默认 7 天。
- log.retention.minutes,分钟。
- log.retention.ms,最高优先级毫秒。
- log.retention.check.interval.ms,负责设置检查周期,默认 5 分钟。
那么日志一旦超过了设置的时间,怎么处理呢?
Kafka 中提供的日志清理策略有 delete 和 compact 两种。
- delete 日志删除:将过期数据删除
- log.cleanup.policy = delete 所有数据启用删除策略
- 基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。
- 基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。log.retention.bytes,默认等于-1,表示无穷大。
思考: 如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?
- log.cleanup.policy = delete 所有数据启用删除策略
- compact 日志压缩
高效读写数据
-
Kafka 本身是分布式集群,可以采用分区技术,并行度高
-
读数据采用稀疏索引,可以快速定位要消费的数据
-
顺序写磁盘
Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
-
页缓存 + 零拷贝技术
参数 | 描述 |
---|---|
log.flush.interval.messages | 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。 |
log.flush.interval.ms | 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理 |
Kafka 消费者
Kafka 消费方式
Kafka 消费者工作流程
消费者总体工作流程
消费者组原理
消费者重要参数
参数名称 | 描述 |
---|---|
bootstrap.servers | 向 Kafka 集群建立初始连接用到的 host/port 列表。 |
key.deserializer 和value.deserializer | 指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。 |
group.id | 标记消费者所属的消费者组。 |
enable.auto.commit | 默认值为 true,消费者会自动周期性地向服务器提交偏移量。 |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。 |
auto.offset.reset | 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。 |
offsets.topic.num.partitions | __consumer_offsets 的分区数,默认是 50 个分区。 |
heartbeat.interval.ms | Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms ,也不应该高于session.timeout.ms 的 1/3。 |
session.timeout.ms | Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。 |
fetch.min.bytes | 默认 1 个字节。消费者获取服务器端一批消息最小的字节数。 |
fetch.max.wait.ms | 默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。 |
fetch.max.bytes | 默认 Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。 |
max.poll.records | 一次 poll 拉取数据返回消息的最大条数,默认是 500 条 |
消费者 API
独立消费者案例(订阅主题)
需求
创建一个独立消费者,消费 first 主题中数据。
注意: 在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。
实现步骤
package ltd.cmjava.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumer {
public static void main(String[] args) {
// 配置
Properties properties = new Properties();
// 连接 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test5");
// 设置分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
// 1 创建一个消费者 "", "hello"
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅主题 first
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
// 3 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
kafkaConsumer.commitAsync();
}
}
}
测试
- 在 IDEA 中执行消费者程序。
- 在 Kafka 集群控制台,创建 Kafka 生产者,并输入数据。
kafka-console-producer.sh --bootstrap-server hadoop100:9092 --topic first
>hello
- 在 IDEA 控制台观察接收到的数据。
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 22, offset = 20, CreateTime = 1677505762405, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello)
独立消费者案例(订阅分区)
需求
创建一个独立消费者,消费 first 主题 0 号分区的数据。
实现步骤
package ltd.cmjava.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumerPartition {
public static void main(String[] args) {
// 配置
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
// 1 创建一个消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅主题对应的分区
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("first",0));
kafkaConsumer.assign(topicPartitions);
// 3 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
测试
- 在 IDEA 中执行消费者程序。
- 在 IDEA 中执行生产者程序 CustomProducerCallback()在控制台观察生成几个 0 号分区的数据。
主题: first 分区: 0
主题: first 分区: 2
主题: first 分区: 2
...
- 在 IDEA 控制台,观察接收到的数据,只能消费到 0 号分区数据表示正确。
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 26, offset = 20, CreateTime = 1677505959577, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = cm24)
...
消费者组案例
需求
测试同一个主题的分区数据,只能由一个消费者组中的一个消费。
案例实操
- 在 IDEA 中同时启动多个消费者,也就是上述独立消费者案例 里面的消费者
package ltd.cmjava.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumer {
public static void main(String[] args) {
// 配置
Properties properties = new Properties();
// 连接 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test5");
// 设置分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");
// 1 创建一个消费者 "", "hello"
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅主题 first
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
// 3 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
kafkaConsumer.commitAsync();
}
}
}
-
启动代码中的CustomProducerCallback发送消息,在 IDEA 控制台即可看到两个消费者在消费不同分区的数据
-
如果重新发送到一个全新的主题中,由于默认创建的主题分区数为 1,可以看到只能有一个消费者消费到数据。
生产经验——分区的分配以及再平衡
参数名称 | 描述 |
---|---|
heartbeat.interval.ms | Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms,也不应该高于session.timeout.ms 的 1/3。 |
session.timeout.ms | Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。 |
partition.assignment.strategy | 消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range + CooperativeSticky。Kafka 可以同时使用多个分区分配策略。可 以 选 择 的 策 略 包 括 : Range 、 RoundRobin 、 Sticky 、CooperativeSticky |
Range 以及再平衡
Range 分区策略原理
Range 分区分配策略案例
- 修改主题 first 为 7 个分区。
# 注意:分区数可以增加,但是不能减少。
kafka-topics.sh --bootstrap-server hadoop100:9092 --alter --topic first --partitions 7
- IDEA启动 3个 CustomConsumer 类。组名都为“test”,同时启动 3 个消费者。
- 启动 CustomProducerCallback 生产者,发送 500 条消息,随机发送到不同的分区
package ltd.cmjava.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class CustomProducerCallback {
public static void main(String[] args) throws InterruptedException {
// 配置
Properties properties = new Properties();
// 连接集群 bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");
// 指定对应的key和value的序列化类型 key.serializer
// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 1 创建kafka生产者对象
// "" hello
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
// 2 发送数据
for (int i = 0; i < 500; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "cm" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null){
System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
}
}
});
Thread.sleep(2);
}
// 3 关闭资源
kafkaProducer.close();
}
}
说明: Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策略。
- 观看 3 个消费者分别消费哪些分区的数据。
Range 分区分配再平衡案例
- 停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。 1 号消费者:消费到 3、4 号分区数据。 2 号消费者:消费到 5、6 号分区数据。 0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。
说明: 0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。 - 再次重新发送消息观看结果(45s 以后)。 1 号消费者:消费到 0、1、2、3 号分区数据。 2 号消费者:消费到 4、5、6 号分区数据。
说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配
RoundRobin 以及再平衡
RoundRobin 分区策略原理
RoundRobin 分区分配策略案例
- 依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代码中修改分区分配策略为 RoundRobin。
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
- 重启 3 个消费者,重复发送消息的步骤,观看分区结果。
RoundRobin 分区分配再平衡案例
- 停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
- 1 号消费者:消费到 2、5 号分区数据
- 2 号消费者:消费到 4、1 号分区数据
- 0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、6 和 3 号分区数据,分别由 1 号消费者或者 2 号消费者消费。
说明: 0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。 - 再次重新发送消息观看结果(45s 以后)。 1 号消费者:消费到 0、2、4、6 号分区数据;2 号消费者:消费到 1、3、5 号分区数据
说明: 消费者 0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配
Sticky 以及再平衡
粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
需求
设置主题为 first,7 个分区;准备 3 个消费者,采用粘性分区策略,并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。
步骤
- 修改分区分配策略为粘性。
注意: 3 个消费者都应该注释掉,之后重启 3 个消费者,如果出现报错,全部停止等会再重启,或者修改为全新的消费者组。
// 修改分区分配策略
ArrayList<String> startegys = new ArrayList<>();
startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);
- 使用同样的生产者发送 500 条消息。可以看到会尽量保持分区的个数近似划分分区。
Sticky 分区分配再平衡案例
- 停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
- 1 号消费者:消费到 2、5、3 号分区数据。
- 2 号消费者:消费到 4、6 号分区数据。
- 0 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别由 1 号消费者或者 2 号消费者消费。
说明: 0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。 - 再次重新发送消息观看结果(45s 以后)。 1 号消费者:消费到 2、3、5 号分区数据。 2 号消费者:消费到 0、1、4、6 号分区数据。
说明: 消费者 0 已经被踢出消费者组,所以重新按照粘性方式分配。
offset 位移
offset 的默认维护位置
__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。
消费 offset 案例
__consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。
- 在配置文件 $KAFKA_HOME/config/consumer.properties 中添加配置 exclude.internal.topics=false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。
- 采用命令行方式,创建一个新的 topic。
kafka-topics.sh --bootstrap-server hadoop100:9092 --create --topic cm --partitions 2 --replication-factor 2
- 启动生产者往 cm 生产数据。
kafka-console-producer.sh --topic cm --bootstrap-server hadoop100:9092
- 启动消费者消费 cm 数据。
kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic cm --group test
注意: 指定消费者组名称,更好观察数据存储位置(key 是 group.id+topic+分区号)。 - 查看消费者消费主题__consumer_offsets。
kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop100:9092 --consumer.config $KAFKA_HOME/config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
# 执行正常可以看到
[test5,first,0]::OffsetAndMetadata(offset=448, leaderEpoch=Optional[26], metadata=, commitTimestamp=1677506870611, expireTimestamp=None)
[test5,first,2]::OffsetAndMetadata(offset=357, leaderEpoch=Optional[22], metadata=, commitTimestamp=1677506871612, expireTimestamp=None)
自动提交 offset
参数名称 | 描述 |
---|---|
enable.auto.commit | 默认值为 true,消费者会自动周期性地向服务器提交偏移量。 |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。 |
消费者自动提交 offset
package ltd.cmjava.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumerAutoOffset {
public static void main(String[] args) {
// 配置
Properties properties = new Properties();
// 连接 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
// 自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
// 提交时间间隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
// 1 创建一个消费者 "", "hello"
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅主题 first
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
// 3 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
测试
kafka-console-producer.sh --topic first --bootstrap-server hadoop100:9092
手动提交 offset
同步提交 offset
由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。以下为同步提交 offset 的示例
package ltd.cmjava.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumerByHandSync {
public static void main(String[] args) {
// 配置
Properties properties = new Properties();
// 连接 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
// 手动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
// 1 创建一个消费者 "", "hello"
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅主题 first
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
// 3 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
// 手动同步提交offset
kafkaConsumer.commitSync();
}
}
}
测试
kafka-console-producer.sh --topic first --bootstrap-server hadoop100:9092
异步提交 offset
虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。以下为异步提交 offset 的示例:
package ltd.cmjava.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumerByHandASync {
public static void main(String[] args) {
// 配置
Properties properties = new Properties();
// 连接 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
// 手动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
// 1 创建一个消费者 "", "hello"
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅主题 first
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
// 3 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
// 手动异步提交offset
kafkaConsumer.commitAsync();
}
}
}
测试
kafka-console-producer.sh --topic first --bootstrap-server hadoop100:9092
指定 Offset 消费
auto.offset.reset = earliest | latest | none 默认是 latest。 当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?
- earliest:自动将偏移量重置为最早的偏移量,–from-beginning。
- latest(默认值):自动将偏移量重置为最新偏移量。
- none:如果未找到消费者组的先前偏移量,则向消费者抛出异常
- 任意指定 offset 位移开始消费
package ltd.cmjava.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Set;
public class CustomConsumerSeek {
public static void main(String[] args) {
// 配置信息
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"+ UUID.randomUUID());
// 1 创建消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
// 指定位置进行消费
Set<TopicPartition> assignment = kafkaConsumer.assignment();
// 保证分区分配方案已经制定完毕
while (assignment.size() == 0){
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}
// 指定消费的offset
for (TopicPartition topicPartition : assignment) {
kafkaConsumer.seek(topicPartition,600);
}
// 3 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
测试
kafka-console-producer.sh --topic first --bootstrap-server hadoop100:9092
注意: 每次执行完,需要修改消费者组名;
指定时间消费
需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?
package ltd.cmjava.kafka.consumer;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.*;
public class CustomConsumerSeekTime {
public static void main(String[] args) {
// 配置信息
Properties properties = new Properties();
// 连接
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"+ UUID.randomUUID());
// 1 创建消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
// 指定位置进行消费
Set<TopicPartition> assignment = kafkaConsumer.assignment();
// 保证分区分配方案已经制定完毕
while (assignment.size() == 0){
kafkaConsumer.poll(Duration.ofSeconds(1));
assignment = kafkaConsumer.assignment();
}
// 希望把时间转换为对应的offset
HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<>();
// 封装对应集合
for (TopicPartition topicPartition : assignment) {
topicPartitionLongHashMap.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);
// 指定消费的offset
for (TopicPartition topicPartition : assignment) {
OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());
}
// 3 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
测试
kafka-console-producer.sh --topic first --bootstrap-server hadoop100:9092
漏消费和重复消费
- 重复消费:已经消费了数据,但是 offset 没提交。
- 漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。
思考: 怎么能做到既不漏消费也不重复消费呢?详看消费者事务。
生产经验——消费者事务
生产经验——数据积压(消费者如何提高吞吐量)
参数名称 | 描述 |
---|---|
fetch.max.bytes | 默认 Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。 |
max.poll.records | 一次 poll 拉取数据返回消息的最大条数,默认是 500 条 |
评论区