标签搜索

目 录CONTENT

文章目录

Kafka部署与基本使用

陈铭
2023-02-26 / 0 评论 / 0 点赞 / 159 阅读 / 17,025 字 / 正在检测是否收录...

安装部署

集群规划如下,由于之前安装过ZooKeeper了,这里主要是安装Kafka

hadoop100 hadoop101 hadoop102
zk zk zk
kafka kafka kafka

上传安装包

官网下载安装包,放置到hadoop100的/opt/software下

cd /opt/software
tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/

cd  /opt/module/
mv kafka_2.12-3.0.0/ kafka-3.0.0

配置文件

配置环境变量

sudo vim /etc/profile
# 添加如下
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka-3.0.0
export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile

修改配置文件

cd $KAFKA_HOME/config

vim server.properties
# 添加以下内容
#broker 的全局唯一编号,不能重复,只能是数字
broker.id=0
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志(数据)存放的路径,路径不需要提前创建
#kafka 自动帮你创建,可以配置多个磁盘路径
#路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka-3.0.0/datas
#topic 在当前 broker 上的分区个数
num.partitions=3
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#每个 topic 创建时的副本数,默认时 1 个副本
offsets.topic.replication.factor=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个 segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824
#检查过期数据的时间,默认 5 分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接Zookeeper 集群地址
#在 zk 根目录下创建/kafka,方便管理
zookeeper.connect=hadoop100:2181,hadoop101:2181,hadoop102:2181/kafka

分发安装包

xsync /opt/module/kafka-3.0.0

然后分别在 hadoop101 和 hadoop102 上修改配置文件/opt/module/kafka-3.0.0/config/server.properties中的 broker.id=1、broker.id=2
broker.id 不得重复,整个集群中唯一

最后hadoop101 和 hadoop102都配置上Kafka的环境变量(操作如上)

启动集群

# 先启动 Zookeeper 集群,然后启动 Kafka(用的是之前部署ZooKeeper时创建的集群启动脚本)
zk.sh start 
# 启动集群:依次在 hadoop100、hadoop101、hadoop102 节点上启动 Kafka
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

# 关闭集群:依次在 hadoop100、hadoop101、hadoop102 节点上关闭Kafka
kafka-server-stop.sh 

集群启停脚本

vim kf.sh
# 添加如下
#!/bin/bash
case $1 in
"start"){
 for i in hadoop100 hadoop101 hadoop102
 do
   echo " --------启动 $i Kafka-------"
   ssh $i "/opt/module/kafka-3.0.0/bin/kafka-server-start.sh -daemon /opt/module/kafka-3.0.0/config/server.properties"
 done
};;
"stop"){
 for i in hadoop100 hadoop101 hadoop102
 do
   echo " --------停止 $i Kafka-------"
   ssh $i "/opt/module/kafka-3.0.0/bin/kafka-server-stop.sh "
 done
};;
esac

启动/停止集群命令

kf.sh start
kf.sh stop

注意: 停止 Kafka 集群时,一定要等 Kafka 所有节点进程全部停止后再停止 Zookeeper集群。因为 Zookeeper 集群当中记录着 Kafka 集群相关信息,Zookeeper 集群一旦先停止,Kafka 集群就没有办法再获取停止进程的信息,只能手动杀死 Kafka 进程了。

启动失败(JAVA_HOME找不到)

如果脚本启动出现问题(经常是ssh方式去执行命令找不到JAVA_HOME)。类似的问题也出现在集群脚本启动Hadoop、Spark、ZooKeeper时,都可以用如下方法解决。

可以配置下.bashrc文件(这个文件在每次shell连接时都会加载一次,ssh执行命令也会加载)

vim ~/.bashrc
# 添加如下
export JAVA_HOME=/opt/module/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin

重置Kafka集群

# 删除三个节点的数据目录
rm -rf /opt/module/kafka-3.0.0/datas

# 进入ZooKeeper客户端
zkCli.sh
# 删除Kafka在ZooKeeper中的元数据
deleteall /kafka

Kafka 命令行操作

image-1677383058205

主题命令行操作

$KAFKA_HOME/bin/kafka-topics.sh
参数 描述
–bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号。
–topic <String: topic> 操作的 topic 名称。
–create 创建主题。
–delete 删除主题。
–alter 修改主题。
–list 查看所有主题。
–describe 查看主题详细描述。
–partitions <Integer: # of partitions> 设置分区数。
–replication-factor<Integer: replication factor> 设置分区副本。
–config <String: name=value> 更新系统默认的配置

查看当前服务器中的所有 topic

kafka-topics.sh --bootstrap-server hadoop100:9092 --list

创建 first topic

kafka-topics.sh --bootstrap-server hadoop100:9092 --create --partitions 1 --replication-factor 3 --topic first
  • –topic 定义 topic 名
  • –replication-factor 定义副本数
  • –partitions 定义分区数

查看 first 主题的详情

kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic first

修改分区数(注意:分区数只能增加,不能减少)

kafka-topics.sh --bootstrap-server hadoop100:9092 --alter --topic first --partitions 3

再次查看 first 主题的详情

kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic first

删除 topic

kafka-topics.sh --bootstrap-server hadoop100:9092 --delete --topic first

生产者命令行操作

查看操作生产者命令参数

$KAFKA_HOME/bin/kafka-console-producer.sh
参数 描述
–bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号。
–topic <String: topic> 操作的 topic 名称。

**发送消息 **

kafka-console-producer.sh --bootstrap-server hadoop100:9092 --topic first
>hello world
>atguigu atguigu

消费者命令行操作

查看操作消费者命令参数

$KAFKA_HOME/bin/kafka-console-consumer.sh
参数 描述
–bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号。
–topic <String: topic> 操作的 topic 名称。
–from-beginning 从头开始消费。
–group <String: consumer group id> 指定消费者组名称。

消费消息

# 消费 first 主题中的数据。
kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic first
# 把主题中所有的数据都读取出来(包括历史数据)。
kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --from-beginning --topic first

Kafka 生产者

生产者消息发送流程

发送原理

在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。
image-1677385056794

生产者重要参数列表

参数名称 描述
bootstrap.servers 生产者连接集群所需的 broker 地 址 清 单 。 例 如hadoop100:9092,hadoop101:9092,hadoop102:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker里查找到其他 broker 信息。
key.serializer 和 value.serializer 指定发送消息的 key 和 value 的序列化类型。一定要写全类名。
buffer.memory RecordAccumulator 缓冲区总大小,默认 32m。
batch.size 缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms 如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。
acks 0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,Leader 收到数据后应答。-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all 是等价的。
max.in.flight.requests.per.connection 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。
retries 当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms 两次重试之间的时间间隔,默认是 100ms。
enable.idempotence 是否开启幂等性,默认 true,开启幂等性。
compression.type 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。

异步发送 API

普通异步发送

需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker
image-1677385087756

代码编写

导入依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

编写不带回调函数的 API 代码

package ltd.cmjava.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducer {

    public static void main(String[] args) {

        // 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");

        // 指定对应的key和value的序列化类型 key.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","cm"+i));
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

测试
在 hadoop100 上开启 Kafka 消费者。

kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic first

在 IDEA 中执行代码,观察 hadoop100 控制台中是否接收到消息

cm 0
cm 1
cm 2
cm 3
cm 4

带回调函数的异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
image-1677392755744
注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试

package ltd.cmjava.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerCallback {

    public static void main(String[] args) throws InterruptedException {

        // 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 500; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "cm" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if (exception == null){
                        System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                    }
                }
            });

            Thread.sleep(2);
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

测试
在 hadoop102 上开启 Kafka 消费者。

kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic first

在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

cm 0
cm 1
cm 2
cm 3
cm 4
...

在 IDEA 控制台观察回调信息。

主题:first->分区:0
主题:first->分区:0
主题:first->分区:1
主题:first->分区:1
主题:first->分区:1

同步发送 API

image-1677393146493
只需在异步发送的基础上,再调用一下 get()方法即可。

package ltd.cmjava.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducerSync {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","cm"+i)).get();
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

测试
在 hadoop100 上开启 Kafka 消费者。

kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic first

在 IDEA 中执行代码,观察 hadoop100 控制台中是否接收到消息

cm 0
cm 1
cm 2
cm 3
cm 4

生产者分区

分区好处

image-1677393340892

生产者发送消息的分区策略

默认的分区器 DefaultPartitioner

在 IDEA 中 ctrl +n,全局查找 DefaultPartitioner。

/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a 
partition based on a hash of the key
* <li>If no partition or key is present choose the sticky 
partition that changes when the batch is full.
* 
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {
 … …
}

image-1677393415574

案例一

将数据发往指定 partition 的情况下,例如,将所有数据发往分区 1 中

package ltd.cmjava.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerCallbackPartitions01 {

    public static void main(String[] args) throws InterruptedException {

        // 配置
        Properties properties = new Properties();

        // 给 kafka 配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop100:9092");
        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        KafkaProducer<String, String> kafkaProducer = new
                KafkaProducer<>(properties);
        for (int i = 0; i < 5; i++) {
            // 指定数据发送到 0 号分区,key 为空(IDEA 中 ctrl + p 查看参数)
            kafkaProducer.send(new ProducerRecord<>("first",
                    0, "", "cm " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata,
                                         Exception e) {
                    if (e == null) {
                        System.out.println(" 主题: " +
                                metadata.topic() + "->" + "分区:" + metadata.partition()
                        );
                    } else {
                        e.printStackTrace();
                    }
                }
            });
        }
        kafkaProducer.close();
    }
}

测试
在 hadoop100 上开启 Kafka 消费者。

kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic first

在 IDEA 中执行代码,观察 hadoop100 控制台中是否接收到消息。

cm 0
cm 1
cm 2
cm 3
cm 4

在 IDEA 控制台观察回调信息。

主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0

案例二

没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值

package ltd.cmjava.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerCallbackPartitions02 {

    public static void main(String[] args) throws InterruptedException {

        // 配置
        Properties properties = new Properties();

        // 给 kafka 配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop100:9092");
        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());

        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        KafkaProducer<String, String> kafkaProducer = new
                KafkaProducer<>(properties);
        String[] keys = {"a", "b", "f"};
        for (int i = 0; i < 3; i++) {
            // 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,分别发往对应的分区
            kafkaProducer.send(new ProducerRecord<>("first",
                    keys[i],"cm " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e == null){
                        System.out.println(" 主题: " +
                                metadata.topic() + "->" + "分区:" + metadata.partition()
                        );
                    }else {
                        e.printStackTrace();
                    }
                }
            });
        }
        kafkaProducer.close();
    }
}

测试

key="a"时,在控制台查看结果
主题:first->分区:1
key="b"时,在控制台查看结果
主题:first->分区:2
key="f"时,在控制台查看结果
主题:first->分区:0

自定义分区器

如果研发人员可以根据企业需求,自己重新实现分区器。
需求: 例如我们实现一个分区器实现,发送过来的数据中如果包含 cm,就发往 0 号分区,不包含 cm,就发往 1 号分区。
定义类实现 Partitioner 接口,重写 partition()方法。

package ltd.cmjava.kafka.producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        // 获取数据
        String msgValues = value.toString();

        int partition;

        if (msgValues.contains("cm")){
            partition = 0;
        }else {
            partition = 1;
        }

        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

在生产者的配置中添加分区器参数。

package ltd.cmjava.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerCallbackPartitions {

    public static void main(String[] args) throws InterruptedException {

        // 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 关联自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "","cm" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if (exception == null){
                        System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                    }
                }
            });

            Thread.sleep(2);
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

测试
在 hadoop100 上开启 Kafka 消费者。

kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic first

在 IDEA 控制台观察回调信息。

主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0
主题:first->分区:0

生产经验——生产者如何提高吞吐量

package ltd.cmjava.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerParameters {

    public static void main(String[] args) {

        // 配置
        Properties properties = new Properties();

        // 连接kafka集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");

        // 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

        // 批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);

        // linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

        // 压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");


        // 1 创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","cm"+i));
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

测试
在 hadoop100 上开启 Kafka 消费者。

kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic first

在 IDEA 中执行代码,观察 hadoop100 控制台中是否接收到消息。

cm 0
cm 1
cm 2
cm 3
cm 4

生产经验——数据可靠性

回顾发送流程
image-1677425169536
ack 应答原理
image-1677425180091

image-1677425192716

image-1677425217811

package ltd.cmjava.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerAcks {

    public static void main(String[] args) {

        // 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // acks
        properties.put(ProducerConfig.ACKS_CONFIG,"1");

        // 重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG,3);

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","cm"+i));
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

生产经验——数据去重

数据传递语义

image-1677425271715

幂等性

幂等性原理
image-1677464969571
如何使用幂等性
开启参数 enable.idempotence 默认为 true,false 关闭。

生产者事务

Kafka 事务原理
image-1677465051052
Kafka 的事务一共有如下 5 个 API

// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
 String consumerGroupId) throws 
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

单个 Producer,使用事务保证消息的仅一次发送

package ltd.cmjava.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerTranactions {

    public static void main(String[] args) {

        // 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop100:9092,hadoop101:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 指定事务id
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tranactional_id_01");

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        kafkaProducer.initTransactions();

        kafkaProducer.beginTransaction();

        try {
            // 2 发送数据
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("first", "cm" + i));
            }
            // 人造一个错误,看看控制台是否提交数据
            int i = 1 / 0;

            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            kafkaProducer.abortTransaction();
        } finally {
            // 3 关闭资源
            kafkaProducer.close();
        }
    }
}

测试
在 hadoop100 上开启 Kafka 消费,看看注释int i = 1 / 0;与否,控制台是否打印信息

kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic first

生产经验——数据有序

image-1677465309072

生产经验——数据乱序

image-1677465325092

Kafka Broker

Kafka Broker 工作流程

Zookeeper 存储的 Kafka 信息

image-1677465474529

zkCli.sh
# 进入到zk客户端,Kafka储存在zk的信息在/kafka下
ls /kafka

Kafka Broker 总体工作流程

image-1677465540978
模拟 Kafka 上下线,Zookeeper 中数据变化

# 进入到zk客户端
# 查看/kafka/brokers/ids 路径上的节点。
ls /kafka/brokers/ids
# 执行正常显示
[0, 1, 2]

# 查看/kafka/controller 路径上的数据。
get /kafka/controller
# 执行正常显示
{"version":1,"brokerid":0,"timestamp":"1637292471777"}

# 查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据
get /kafka/brokers/topics/first/partitions/0/state
# 执行正常显示
{"controller_epoch":24,"leader":0,"version":1,"leader_epoch":18,"isr":[0,1,2]} 

# 停止 hadoop102 上的 kafka(在hadoop102的linux环境下执行)
kafka-server-stop.sh

# 再次查看/kafka/brokers/ids 路径上的节点。
ls /kafka/brokers/ids
# 执行正常显示
[0, 1]

# 再次查看/kafka/controller 路径上的数据。
get /kafka/controller
# 执行正常显示
{"version":1,"brokerid":0,"timestamp":"1637292471777"}

# 再次查看/kafka/brokers/topics/first/partitions/0/state 路径上的数据。
get /kafka/brokers/topics/first/partitions/0/state
# 执行正常显示{"controller_epoch":24,"leader":0,"version":1,"leader_epoch":18,"isr":[0,1]} 

# 启动 hadoop102 上的 kafka。
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
# 再次观察/kafka的内容。

Broker 重要参数

参数名称 描述
replica.lag.time.max.ms ISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值,默认 30s。
auto.leader.rebalance.enable 默认是 true。 自动 Leader Partition 平衡。
eader.imbalance.per.broker.percentage 默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。
leader.imbalance.check.interval.seconds 默认值 300 秒。检查 leader 负载是否平衡的间隔时
间。
log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分 成块的大小,默认值 1G。
log.index.interval.bytes 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。
log.retention.hours Kafka 中数据保存的时间,默认 7 天。
log.retention.minutes Kafka 中数据保存的时间,分钟级别,默认关闭。
log.retention.ms Kafka 中数据保存的时间,毫秒级别,默认关闭。
log.retention.check.interval.ms 检查数据是否保存超时的间隔,默认是 5 分钟。
log.retention.bytes 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment。
log.cleanup.policy 默认是 delete,表示所有数据启用删除策略;如果设置值为 compact,表示所有数据启用压缩策略。
num.io.threads 默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。 num.replica.fetchers 副本拉取线程数,这个参数占总核数的 50%的 1/3
num.network.threads 默认是 3。数据传输线程数,这个参数占总核数的50%的 2/3 。
log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。
log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。

生产经验——节点服役和退役

服役新节点

新节点准备

关闭 hadoop102,并右键执行克隆操作。并设置主机名 hadoop103,并修改 IP 地址。

vim /etc/sysconfig/network-scripts/ifcfg-ens33
# 就改了IPADDR这一出的Ip地址
# 完整内容如下
TYPE="Ethernet"
PROXY_METHOD="none"
BROWSER_ONLY="no"
BOOTPROTO="static"
DEFROUTE="yes"
IPV4_FAILURE_FATAL="no"
IPV6INIT="yes"
IPV6_AUTOCONF="yes"
IPV6_DEFROUTE="yes"
IPV6_FAILURE_FATAL="no"
IPV6_ADDR_GEN_MODE="stable-privacy"
NAME="ens33"
UUID="3fc01f0a-bd12-4b1b-85e1-a5785b26ac55"
DEVICE="ens33"
ONBOOT="yes"
IPADDR=192.168.217.103
GATEWAY=192.168.217.2
DNS1=8.8.8.8
DNS2=114.114.114.114

# 修改主机名
vim /etc/hostname
# 完整内容如下
hadoop103

# VmWare上重启虚拟机hadoop102和hadoop103

修改 haodoop105 中 kafka 的 broker.id 为 3

vim $KAFKA_HOME/config/server.properties 
# 修改如下
broker.id=3

删除 hadoop105 中 kafka 下的 datas 和 logs

rm -rf $KAFKA_HOME/datas/* $KAFKA_HOME/logs/*

启动 hadoop100、hadoop101、hadoop102 上的 kafka 集群

# 使用集群启停脚本
zk.sh start
kf.sh start 

单独启动 hadoop103 中的 kafka。

kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

执行负载均衡操作

在hadoop100下,创建一个要均衡的主题

vim $KAFKA_HOME/topics-to-move.json
# 完整内容如下
{
 "topics": [
 {"topic": "first"}
 ],
 "version": 1
}

生成一个负载均衡的计划。

kafka-reassign-partitions.sh --bootstrap-server hadoop100:9092 --topics-to-move-json-file $KAFKA_HOME/topics-to-move.json --broker-list "0,1,2,3" --generate
# 执行正常显示
Current partition replica assignment
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}

创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)。

vim $KAFKA_HOME/increase-replication-factor.json
# 完整内容如下
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}

执行副本存储计划。

kafka-reassign-partitions.sh --bootstrap-server hadoop100:9092 --reassignment-json-file $KAFKA_HOME/increase-replication-factor.json --execute

验证副本存储计划。

kafka-reassign-partitions.sh --bootstrap-server hadoop100:9092 --reassignment-json-file $KAFKA_HOME/increase-replication-factor.json --verify
# 执行正常显示
Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.
Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic first

退役旧节点

执行负载均衡操作

先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。
创建一个要均衡的主题。

vim $KAFKA_HOME/topics-to-move.json
# 完整内容如下
{
 "topics": [
 {"topic": "first"}
 ],
 "version": 1
}

创建执行计划。

kafka-reassign-partitions.sh --bootstrap-server hadoop100:9092 --topics-to-move-json-file $KAFKA_HOME/topics-to-move.json --broker-list "0,1,2" --generate
# 执行正常显示
Current partition replica assignment
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,2,3],"log_dirs":["any","any","any"]}]}
Proposed partition reassignment configuration{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]}]}

创建副本存储计划(所有副本存储在 broker0、broker1、broker2 中)

vim $KAFKA_HOME/increase-replication-factor.json
# 完整内容如下
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[0,1,2],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,2,0],"log_dirs":["any","any","any"]}]}

执行副本存储计划。

kafka-reassign-partitions.sh --bootstrap-server hadoop100:9092 --reassignment-json-file $KAFKA_HOME/increase-replication-factor.json --execute --additional

验证副本存储计划。(出现报错java.net.UnknownHostException: hadoop103,在hadoop100上配一下/etc/hosts就行了,不用source)

kafka-reassign-partitions.sh --bootstrap-server hadoop100:9092 --reassignment-json-file $KAFKA_HOME/increase-replication-factor.json --verify
# 执行正常显示
Status of partition reassignment:
Reassignment of partition first-0 is complete.
Reassignment of partition first-1 is complete.
Reassignment of partition first-2 is complete.
Clearing broker-level throttles on brokers 0,1,2,3
Clearing topic-level throttles on topic first

执行停止命令

在 hadoop103 上执行停止命令即可。

kafka-server-stop.sh

Kafka 副本

副本基本信息

  • Kafka 副本作用:提高数据可靠性。
  • Kafka 默认副本 1 个,生产环境一般配置为 2 个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
  • Kafka 中副本分为:Leader 和 Follower。Kafka 生产者只会把数据发往 Leader,然后 Follower 找 Leader 进行同步数据。
  • Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。AR = ISR + OSR
  • ISR,表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值由 replica.lag.time.max.ms参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的Leader。
  • OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本

Leader 选举流程

Kafka 集群中有一个 broker 的 Controller 会被选举为 Controller Leader,负责管理集群broker 的上下线,所有 topic 的分区副本分配和 Leader 选举等工作。Controller 的信息同步工作是依赖于 Zookeeper 的。
image-1677467951499

创建一个新的 topic,4 个分区,4 个副本
把上述的hadoop103节点的Kafka启动起来,然后执行

kafka-topics.sh --bootstrap-server hadoop100:9092 --create --topic cm1 --partitions 4 --replication-factor 4
# 执行正常显示
Created topic cm1. 

查看 Leader 分布情况

kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic cm1
# 执行正常显示
Topic: cm1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: cm1 Partition: 0 Leader: 3 Replicas: 3,0,2,1 Isr: 3,0,2,1
Topic: cm1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,3,0
Topic: cm1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,1,2
Topic: cm1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0,3

停止掉 hadoop103 的 kafka 进程,并查看 Leader 分区情况

# hadoop103处执行
kafka-server-stop.sh

# hadoop100处执行
kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic cm1
# 执行正常显示
Topic: cm1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: cm1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,2,1
Topic: cm1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,2,0
Topic: cm1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,2
Topic: cm1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 2,1,0

停止掉 hadoop102 的 kafka 进程,并查看 Leader 分区情况

# hadoop102处执行
kafka-server-stop.sh

# hadoop100处执行
kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic cm1
# 执行正常显示
Topic: cm1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: cm1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1
Topic: cm1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0
Topic: cm1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1
Topic: cm1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0

启动 hadoop103 的 kafka 进程,并查看 Leader 分区情况

# hadoop103处执行
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

# hadoop100处执行
kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic cm1
# 执行正常显示
Topic: cm1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: cm1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3
Topic: cm1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3
Topic: cm1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3
Topic: cm1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3

启动 hadoop102 的 kafka 进程,并查看 Leader 分区情况

# hadoop102处执行
kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

# hadoop100处执行
kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic cm1
# 执行正常显示
Topic: cm1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: cm1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,1,3,2
Topic: cm1 Partition: 1 Leader: 1 Replicas: 1,2,3,0 Isr: 1,0,3,2
Topic: cm1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,1,3,2
Topic: cm1 Partition: 3 Leader: 1 Replicas: 2,1,0,3 Isr: 1,0,3,2

停止掉 hadoop101 的 kafka 进程,并查看 Leader 分区情况

# hadoop101处执行
kafka-server-stop.sh

# hadoop100处执行
kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic cm1
# 执行正常显示
Topic: cm1 TopicId: awpgX_7WR-OX3Vl6HE8sVg PartitionCount: 4 ReplicationFactor: 4
Configs: segment.bytes=1073741824
Topic: cm1 Partition: 0 Leader: 0 Replicas: 3,0,2,1 Isr: 0,3,2
Topic: cm1 Partition: 1 Leader: 2 Replicas: 1,2,3,0 Isr: 0,3,2
Topic: cm1 Partition: 2 Leader: 0 Replicas: 0,3,1,2 Isr: 0,3,2
Topic: cm1 Partition: 3 Leader: 2 Replicas: 2,1,0,3 Isr: 0,3,2

Leader 和 Follower 故障处理细节

image-1677468060414

image-1677468066465

分区副本分配

如果 kafka 服务器只有 4 个节点,那么设置 kafka 的分区数大于服务器台数,在 kafka底层如何分配存储副本呢?
创建 16 分区,3 个副本
创建一个新的 topic,名称为 second。

kafka-topics.sh --bootstrap-server hadoop100:9092 --create --partitions 16 --replication-factor 3 --topic second

查看分区和副本情况。

kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic second
# 执行正常显示
Topic: second4 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 2 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 3 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1
Topic: second4 Partition: 4 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
Topic: second4 Partition: 5 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0
Topic: second4 Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: second4 Partition: 7 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: second4 Partition: 8 Leader: 0 Replicas: 0,3,1 Isr: 0,3,1
Topic: second4 Partition: 9 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: second4 Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: second4 Partition: 11 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0
Topic: second4 Partition: 12 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 14 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 15 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1

image-1677468091690

生产经验——手动调整分区副本存储

image-1677468108086
手动调整分区副本存储的步骤如下:
创建一个新的 topic,名称为 three。

kafka-topics.sh --bootstrap-server hadoop100:9092 --create --partitions 4 --replication-factor 2 --topic three

查看分区副本存储情况。

kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic three

创建副本存储计划(所有副本都指定存储在 broker0、broker1 中)。

vim $KAFKA_HOME/increase-replication-factor.json
# 完整内容如下
{"version":1,"partitions":[{"topic":"three","partition":0,"replicas":[0,1]},{"topic":"three","partition":1,"replicas":[0,1]},{"topic":"three","partition":2,"replicas":[1,0]},{"topic":"three","partition":3,"replicas":[1,0]}] } 

执行副本存储计划。

kafka-reassign-partitions.sh --bootstrap-server hadoop100:9092 --reassignment-json-file $KAFKA_HOME/increase-replication-factor.json --execute

验证副本存储计划。

kafka-reassign-partitions.sh --bootstrap-server hadoop100:9092 --reassignment-json-file $KAFKA_HOME/increase-replication-factor.json --verify

查看分区副本存储情况。

kafka-topics.sh --bootstrap-server hadoop100:9092 --describe --topic three

生产经验——Leader Partition 负载平衡

image-1677468131248

参数名称 描述
auto.leader.rebalance.enable 默认是 true。 自动 Leader Partition 平衡。生产环境中,leader 重选举的代价比较大,可能会带来性能影响,建议设置为 false 关闭。
leader.imbalance.per.broker.percentage 默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。
leader.imbalance.check.interval.seconds 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。

生产经验——增加副本因子

在生产环境当中,由于某个主题的重要等级需要提升,我们考虑增加副本。副本数的增加需要先制定计划,然后根据计划执行。
创建 topic

kafka-topics.sh --bootstrap-server hadoop100:9092 --create --partitions 3 --replication-factor 1 --topic four

手动增加副本存储
创建副本存储计划(所有副本都指定存储在 broker0、broker1、broker2 中)

vim $KAFKA_HOME/increase-replication-factor.json
# 完整内容如下
{"version":1,"partitions":[{"topic":"four","partition":0,"replicas":[0,1,2]},{"topic":"four","partition":1,"replicas":[0,1,2]},{"topic":"four","partition":2,"replicas":[0,1,2]}]}

执行副本存储计划。

kafka-reassign-partitions.sh --bootstrap-server hadoop100:9092 --reassignment-json-file $KAFKA_HOME/increase-replication-factor.json --execute

文件存储

文件存储机制

Topic 数据的存储机制
image-1677468173071
思考:Topic 数据到底存储在什么位置?
启动生产者,并发送消息。

kafka-console-producer.sh --bootstrap-server hadoop100:9092 --topic first
>hello world

查看 hadoop102(或者 hadoop103、hadoop104)的/opt/module/kafka-3.0.0/datas/first-1 (first-0、first-2)路径上的文件。

ll /opt/module/kafka-3.0.0/datas/first-1
# 执行正常显示
-rw-rw-r-- 1 cm cm 10485760 2月  27 11:43 00000000000000000000.index
-rw-rw-r-- 1 cm cm      667 2月  27 11:43 00000000000000000000.log
-rw-rw-r-- 1 cm cm 10485756 2月  27 11:43 00000000000000000000.timeindex
-rw-rw-r-- 1 cm cm       13 2月  27 12:05 leader-epoch-checkpoint
-rw-rw-r-- 1 cm cm       43 2月  27 11:43 partition.metadata

直接查看 log 日志,发现是乱码。

cat /opt/module/kafka-3.0.0/datas/first-1/00000000000000000000.log
\CYnF|©|©ÿÿÿÿÿÿÿÿÿÿÿÿÿÿ"hello world

通过工具查看 index 和 log 信息。

cd /opt/module/kafka-3.0.0/datas/first-1
kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index 
# 执行正常显示
offset: 0 position: 0
Mismatches in :/opt/module/kafka-3.0.0/datas/first-1/./00000000000000000000.index
  Index offset: 0, log offset: 1
  
kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log
Dumping ./00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 1 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1677423438331 size: 81 magic: 2 compresscodec: none crc: 1132630391 isvalid: true
baseOffset: 2 lastOffset: 3 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 81 CreateTime: 1677423438340 size: 81 magic: 2 compresscodec: none crc: 4175274034 isvalid: true
baseOffset: 4 lastOffset: 4 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 162 CreateTime: 1677423438343 size: 71 magic: 2 compresscodec: none crc: 2382795792 isvalid: true
baseOffset: 5 lastOffset: 7 count: 3 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 233 CreateTime: 1677423456391 size: 91 magic: 2 compresscodec: none crc: 1439649198 isvalid: true
baseOffset: 8 lastOffset: 9 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 324 CreateTime: 1677423456398 size: 81 magic: 2 compresscodec: none crc: 4229859629 isvalid: true
baseOffset: 10 lastOffset: 10 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 405 CreateTime: 1677423500141 size: 73 magic: 2 compresscodec: none crc: 1899802602 isvalid: true
baseOffset: 11 lastOffset: 15 count: 5 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 5 isTransactional: false isControl: false position: 478 CreateTime: 1677464904621 size: 111 magic: 2 compresscodec: none crc: 2446558136 isvalid: true
baseOffset: 16 lastOffset: 16 count: 1 baseSequence: -1 lastSequence: -1 producerId: 0 producerEpoch: 2 partitionLeaderEpoch: 5 isTransactional: true isControl: true position: 589 CreateTime: 1677465215911 size: 78 magic: 2 compresscodec: none crc: 253418711 isvalid: true

index 文件和 log 文件详解
image-1677468205325

日志存储参数配置

参数 描述
log.segment.bytes Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小,默认值 1G。
log.index.interval.bytes 默认 4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 稀疏索引。

文件清理策略

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间。

那么日志一旦超过了设置的时间,怎么处理呢?
Kafka 中提供的日志清理策略有 delete 和 compact 两种。

  • delete 日志删除:将过期数据删除
    • log.cleanup.policy = delete 所有数据启用删除策略
      • 基于时间:默认打开。以 segment 中所有记录中的最大时间戳作为该文件时间戳。
      • 基于大小:默认关闭。超过设置的所有日志总大小,删除最早的 segment。log.retention.bytes,默认等于-1,表示无穷大。
        思考: 如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?
        image-1677468225351
  • compact 日志压缩
    image-1677468238086

高效读写数据

  • Kafka 本身是分布式集群,可以采用分区技术,并行度高

  • 读数据采用稀疏索引,可以快速定位要消费的数据

  • 顺序写磁盘
    Kafka 的 producer 生产数据,要写入到 log 文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到 600M/s,而随机写只有 100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。
    image-1677468255932

  • 页缓存 + 零拷贝技术
    image-1677468267378

参数 描述
log.flush.interval.messages 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。
log.flush.interval.ms 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理

Kafka 消费者

Kafka 消费方式

image-1677500797278

Kafka 消费者工作流程

消费者总体工作流程

image-1677500816147

消费者组原理

image-1677500828452

image-1677500836935

image-1677500841477

image-1677500852257

消费者重要参数

参数名称 描述
bootstrap.servers 向 Kafka 集群建立初始连接用到的 host/port 列表。
key.deserializer 和value.deserializer 指定接收消息的 key 和 value 的反序列化类型。一定要写全类名。
group.id 标记消费者所属的消费者组。
enable.auto.commit 默认值为 true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。
auto.offset.reset 当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。
offsets.topic.num.partitions __consumer_offsets 的分区数,默认是 50 个分区。
heartbeat.interval.ms Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms也不应该高于session.timeout.ms 的 1/3。
session.timeout.ms Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。
fetch.min.bytes 默认 1 个字节。消费者获取服务器端一批消息最小的字节数。
fetch.max.wait.ms 默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。
fetch.max.bytes 默认 Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。
max.poll.records 一次 poll 拉取数据返回消息的最大条数,默认是 500 条

消费者 API

独立消费者案例(订阅主题)

需求

创建一个独立消费者,消费 first 主题中数据。
image-1677500919918
注意: 在消费者 API 代码中必须配置消费者组 id。命令行启动消费者不填写消费者组id 会被自动填写随机的消费者组 id。

实现步骤

package ltd.cmjava.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumer {

    public static void main(String[] args) {

        // 配置
        Properties properties = new Properties();

        // 连接 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");

        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test5");

        // 设置分区分配策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");

        // 1 创建一个消费者  "", "hello"
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        // 2 订阅主题 first
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);

        // 3 消费数据
        while (true){

            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }

            kafkaConsumer.commitAsync();
        }
    }
}

测试

  • 在 IDEA 中执行消费者程序。
  • 在 Kafka 集群控制台,创建 Kafka 生产者,并输入数据。
kafka-console-producer.sh --bootstrap-server hadoop100:9092 --topic first
>hello
  • 在 IDEA 控制台观察接收到的数据。
ConsumerRecord(topic = first, partition = 1, leaderEpoch = 22, offset = 20, CreateTime = 1677505762405, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello)

独立消费者案例(订阅分区)

需求

创建一个独立消费者,消费 first 主题 0 号分区的数据。
image-1677500984244

实现步骤

package ltd.cmjava.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumerPartition {

    public static void main(String[] args) {
        // 配置
        Properties properties = new Properties();

        // 连接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");

        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        // 1 创建一个消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        // 2 订阅主题对应的分区
        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        topicPartitions.add(new TopicPartition("first",0));
        kafkaConsumer.assign(topicPartitions);

        // 3 消费数据
        while (true){

            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

测试

  • 在 IDEA 中执行消费者程序。
  • 在 IDEA 中执行生产者程序 CustomProducerCallback()在控制台观察生成几个 0 号分区的数据。
主题: first 分区: 0
主题: first 分区: 2
主题: first 分区: 2
...
  • 在 IDEA 控制台,观察接收到的数据,只能消费到 0 号分区数据表示正确。
ConsumerRecord(topic = first, partition = 0, leaderEpoch = 26, offset = 20, CreateTime = 1677505959577, serialized key size = -1, serialized value size = 4, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = cm24)
...

消费者组案例

需求

测试同一个主题的分区数据,只能由一个消费者组中的一个消费。
image-1677501050796

案例实操

  • 在 IDEA 中同时启动多个消费者,也就是上述独立消费者案例 里面的消费者
package ltd.cmjava.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumer {

    public static void main(String[] args) {

        // 配置
        Properties properties = new Properties();

        // 连接 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");

        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test5");

        // 设置分区分配策略
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");

        // 1 创建一个消费者  "", "hello"
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        // 2 订阅主题 first
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);

        // 3 消费数据
        while (true){

            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }

            kafkaConsumer.commitAsync();
        }
    }
}

image-1677506245447

  • 启动代码中的CustomProducerCallback发送消息,在 IDEA 控制台即可看到两个消费者在消费不同分区的数据

  • 如果重新发送到一个全新的主题中,由于默认创建的主题分区数为 1,可以看到只能有一个消费者消费到数据。

生产经验——分区的分配以及再平衡

image-1677506450508

参数名称 描述
heartbeat.interval.ms Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。该条目的值必须小于 session.timeout.ms也不应该高于session.timeout.ms 的 1/3。
session.timeout.ms Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超过该值,该消费者被移除,消费者组执行再平衡。
max.poll.interval.ms 消费者处理消息的最大时长,默认是 5 分钟。超过该值,该消费者被移除,消费者组执行再平衡。
partition.assignment.strategy 消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range + CooperativeSticky。Kafka 可以同时使用多个分区分配策略。可 以 选 择 的 策 略 包 括 : Range 、 RoundRobin 、 Sticky 、CooperativeSticky

Range 以及再平衡

Range 分区策略原理

image-1677506586994

Range 分区分配策略案例

  • 修改主题 first 为 7 个分区。
# 注意:分区数可以增加,但是不能减少。
kafka-topics.sh --bootstrap-server hadoop100:9092 --alter --topic first --partitions 7
  • IDEA启动 3个 CustomConsumer 类。组名都为“test”,同时启动 3 个消费者。
  • 启动 CustomProducerCallback 生产者,发送 500 条消息,随机发送到不同的分区
package ltd.cmjava.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerCallback {

    public static void main(String[] args) throws InterruptedException {

        // 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 500; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "cm" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if (exception == null){
                        System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                    }
                }
            });

            Thread.sleep(2);
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

说明: Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策略。

  • 观看 3 个消费者分别消费哪些分区的数据。

Range 分区分配再平衡案例

  • 停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。 1 号消费者:消费到 3、4 号分区数据。 2 号消费者:消费到 5、6 号分区数据。 0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。
    说明: 0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
  • 再次重新发送消息观看结果(45s 以后)。 1 号消费者:消费到 0、1、2、3 号分区数据。 2 号消费者:消费到 4、5、6 号分区数据。
    说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配

RoundRobin 以及再平衡

RoundRobin 分区策略原理

image-1677506644644

RoundRobin 分区分配策略案例

  • 依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代码中修改分区分配策略为 RoundRobin。
// 修改分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");
  • 重启 3 个消费者,重复发送消息的步骤,观看分区结果。

RoundRobin 分区分配再平衡案例

  • 停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
  • 1 号消费者:消费到 2、5 号分区数据
  • 2 号消费者:消费到 4、1 号分区数据
  • 0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、6 和 3 号分区数据,分别由 1 号消费者或者 2 号消费者消费。
    说明: 0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
  • 再次重新发送消息观看结果(45s 以后)。 1 号消费者:消费到 0、2、4、6 号分区数据;2 号消费者:消费到 1、3、5 号分区数据
    说明: 消费者 0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配

Sticky 以及再平衡

粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。

需求

设置主题为 first,7 个分区;准备 3 个消费者,采用粘性分区策略,并进行消费,观察消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。

步骤

  • 修改分区分配策略为粘性。
    注意: 3 个消费者都应该注释掉,之后重启 3 个消费者,如果出现报错,全部停止等会再重启,或者修改为全新的消费者组。
// 修改分区分配策略
ArrayList<String> startegys = new ArrayList<>();
startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);
  • 使用同样的生产者发送 500 条消息。可以看到会尽量保持分区的个数近似划分分区。

Sticky 分区分配再平衡案例

  • 停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
  • 1 号消费者:消费到 2、5、3 号分区数据。
  • 2 号消费者:消费到 4、6 号分区数据。
  • 0 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别由 1 号消费者或者 2 号消费者消费。
    说明: 0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
  • 再次重新发送消息观看结果(45s 以后)。 1 号消费者:消费到 2、3、5 号分区数据。 2 号消费者:消费到 0、1、4、6 号分区数据。
    说明: 消费者 0 已经被踢出消费者组,所以重新按照粘性方式分配。

offset 位移

offset 的默认维护位置

image-1677507470129
__consumer_offsets 主题里面采用 key 和 value 的方式存储数据。key 是 group.id+topic+分区号,value 就是当前 offset 的值。每隔一段时间,kafka 内部会对这个 topic 进行compact,也就是每个 group.id+topic+分区号就保留最新数据。

消费 offset 案例

__consumer_offsets 为 Kafka 中的 topic,那就可以通过消费者进行消费。

  1. 在配置文件 $KAFKA_HOME/config/consumer.properties 中添加配置 exclude.internal.topics=false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为 false。
  2. 采用命令行方式,创建一个新的 topic。
kafka-topics.sh --bootstrap-server hadoop100:9092 --create --topic cm --partitions 2 --replication-factor 2
  1. 启动生产者往 cm 生产数据。
kafka-console-producer.sh --topic cm --bootstrap-server hadoop100:9092
  1. 启动消费者消费 cm 数据。
    kafka-console-consumer.sh --bootstrap-server hadoop100:9092 --topic cm --group test
    注意: 指定消费者组名称,更好观察数据存储位置(key 是 group.id+topic+分区号)。
  2. 查看消费者消费主题__consumer_offsets。
kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server hadoop100:9092 --consumer.config $KAFKA_HOME/config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
# 执行正常可以看到
[test5,first,0]::OffsetAndMetadata(offset=448, leaderEpoch=Optional[26], metadata=, commitTimestamp=1677506870611, expireTimestamp=None)
[test5,first,2]::OffsetAndMetadata(offset=357, leaderEpoch=Optional[22], metadata=, commitTimestamp=1677506871612, expireTimestamp=None)

自动提交 offset

image-1677568299987

参数名称 描述
enable.auto.commit 默认值为 true,消费者会自动周期性地向服务器提交偏移量。
auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。

消费者自动提交 offset

package ltd.cmjava.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumerAutoOffset {

    public static void main(String[] args) {

        // 配置
        Properties properties = new Properties();

        // 连接 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");

        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        // 自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);

        // 提交时间间隔
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);

        // 1 创建一个消费者  "", "hello"
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        // 2 订阅主题 first
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);

        // 3 消费数据
        while (true){

            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

测试

kafka-console-producer.sh --topic first --bootstrap-server hadoop100:9092

手动提交 offset

image-1677568925818

同步提交 offset

由于同步提交 offset 有失败重试机制,故更加可靠,但是由于一直等待提交结果,提交的效率比较低。以下为同步提交 offset 的示例

package ltd.cmjava.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumerByHandSync {

    public static void main(String[] args) {

        // 配置
        Properties properties = new Properties();

        // 连接 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");

        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        // 手动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        // 1 创建一个消费者  "", "hello"
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        // 2 订阅主题 first
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);

        // 3 消费数据
        while (true){

            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }

            // 手动同步提交offset
            kafkaConsumer.commitSync();
        }
    }
}

测试

kafka-console-producer.sh --topic first --bootstrap-server hadoop100:9092

异步提交 offset

虽然同步提交 offset 更可靠一些,但是由于其会阻塞当前线程,直到提交成功。因此吞吐量会受到很大的影响。因此更多的情况下,会选用异步提交 offset 的方式。以下为异步提交 offset 的示例:

package ltd.cmjava.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumerByHandASync {

    public static void main(String[] args) {

        // 配置
        Properties properties = new Properties();

        // 连接 bootstrap.servers
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");

        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        // 手动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

        // 1 创建一个消费者  "", "hello"
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        // 2 订阅主题 first
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);

        // 3 消费数据
        while (true){

            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }

            // 手动异步提交offset
            kafkaConsumer.commitAsync();
        }
    }
}

测试

kafka-console-producer.sh --topic first --bootstrap-server hadoop100:9092

指定 Offset 消费

auto.offset.reset = earliest | latest | none 默认是 latest。 当 Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

  • earliest:自动将偏移量重置为最早的偏移量,–from-beginning。
  • latest(默认值):自动将偏移量重置为最新偏移量。
  • none:如果未找到消费者组的先前偏移量,则向消费者抛出异常
    image-1677569169575
  • 任意指定 offset 位移开始消费
package ltd.cmjava.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
import java.util.Set;

public class CustomConsumerSeek {

    public static void main(String[] args) {

        // 配置信息
        Properties properties = new Properties();

        // 连接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");

        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"+ UUID.randomUUID());

        // 1 创建消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        // 2 订阅主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);

        // 指定位置进行消费
        Set<TopicPartition> assignment = kafkaConsumer.assignment();

        //  保证分区分配方案已经制定完毕
        while (assignment.size() == 0){
            kafkaConsumer.poll(Duration.ofSeconds(1));
            // 获取消费者分区分配信息(有了分区分配信息才能开始消费)
            assignment = kafkaConsumer.assignment();
        }

        // 指定消费的offset
        for (TopicPartition topicPartition : assignment) {
            kafkaConsumer.seek(topicPartition,600);
        }

        // 3  消费数据
        while (true){

            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {

                System.out.println(consumerRecord);
            }
        }
    }
}

测试

kafka-console-producer.sh --topic first --bootstrap-server hadoop100:9092

注意: 每次执行完,需要修改消费者组名;

指定时间消费

需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?

package ltd.cmjava.kafka.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

public class CustomConsumerSeekTime {

    public static void main(String[] args) {

        // 配置信息
        Properties properties = new Properties();

        // 连接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092,hadoop101:9092");

        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"+ UUID.randomUUID());

        // 1 创建消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        // 2 订阅主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);

        // 指定位置进行消费
        Set<TopicPartition> assignment = kafkaConsumer.assignment();

        //  保证分区分配方案已经制定完毕
        while (assignment.size() == 0){
            kafkaConsumer.poll(Duration.ofSeconds(1));

            assignment = kafkaConsumer.assignment();
        }

        // 希望把时间转换为对应的offset
        HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<>();

        // 封装对应集合
        for (TopicPartition topicPartition : assignment) {
            topicPartitionLongHashMap.put(topicPartition,System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
        }

        Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);

        // 指定消费的offset
        for (TopicPartition topicPartition : assignment) {

            OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);

            kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());
        }

        // 3  消费数据
        while (true){

            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {

                System.out.println(consumerRecord);
            }
        }
    }
}

测试

kafka-console-producer.sh --topic first --bootstrap-server hadoop100:9092

漏消费和重复消费

  • 重复消费:已经消费了数据,但是 offset 没提交。
  • 漏消费:先提交 offset 后消费,有可能会造成数据的漏消费。
    image-1677569288930
    思考: 怎么能做到既不漏消费也不重复消费呢?详看消费者事务。

生产经验——消费者事务

image-1677569368328

生产经验——数据积压(消费者如何提高吞吐量)

image-1677569381697

参数名称 描述
fetch.max.bytes 默认 Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响。
max.poll.records 一次 poll 拉取数据返回消息的最大条数,默认是 500 条
0

评论区