Ezekielx
Ezekielx
发布于 2025-12-01 / 6 阅读
0
0

FlumeKafkaSqoop Chapter 3:Kafka Advanced Features and Applications(Kafka 的高级特性与应用)

一、Kafka Producer

Kafka Producer 是一个向 Kafka 集群发送消息的客户端程序。它负责以消息的形式将数据流发布到指定的主题中。

1、Working Principle of Kafka Producer(Kafka Producer 的工作原理)

Producer 以推送(push)模式工作,将消息按顺序追加并写入 Broker 中的特定分区。这种顺序写入磁盘的方式极大地提升了效率,并确保了 Kafka 的高吞吐量。每条消息在分区中都有一个唯一的偏移量(offset)值,用于保证消息的顺序性。

8ZFAn6YT-1.png

分区机制是 Kafka Producer 的核心特性之一。它不仅提升了系统的并发处理能力,还通过分区扩展了集群的数据处理能力。Producer 会根据消息键(key)的哈希值或轮询算法将消息分配到不同的分区,以确保负载均衡。消息写入过程是 Kafka Producer 的另一关键部分。Kafka Producer 的消息写入流程如下图所示。

8ZFAn6YT-2.png

构建 ProducerRecord

ProducerRecord 是 Kafka 中用于封装消息数据的类,其中包含消息主题、可选的分区键、消息内容本身以及时间戳等信息。在构建 ProducerRecord 时,开发者需要指定消息的 topic 和 value,并可在需要时指定 key 和 timestamp。

key 的存在使 Kafka 能依据 key 的哈希值将消息均匀分布到不同分区中,从而保证具有相同 key 的消息能够按顺序存放在同一个分区内。

调用 send 方法发送消息

send 方法本身是异步的,调用后会立即返回一个 Future 对象。

该 Future 对象包含发送结果或异常信息。用户可以通过设置回调函数(callback)来处理消息发送成功的确认或失败的错误。

如果希望改为同步发送模式,可以在调用 send() 方法后接着调用 get() 方法,此时程序将阻塞直到 Kafka 返回响应。

消息序列化(Serialization)

接下来是消息的序列化过程。序列化是将消息的 key 和 value 转换为字节数组的过程。Kafka 允许开发者自定义序列化器,也可以使用 Kafka 提供的默认序列化器。

序列化后的消息将被放入 Producer 的缓冲区中等待发送。该缓冲区用于临时存储待发送的消息,并以异步、批量的方式将消息发送到 broker,从而减少网络请求次数并进一步提升消息发送效率。

分区选择

Producer 会根据分区策略选择消息应该发送到哪个分区。

  • 如果消息指定了分区 key,则 Producer 会根据 key 的哈希值来决定分区;
  • 如果未指定 key,Producer 将使用轮询策略(Round-Robin)或其他自定义分区策略来选择分区。

Broker 写入消息

消息发送到 broker 后,broker 将根据配置的存储策略将消息写入日志。

  • Leader 节点首先将消息写入本地日志;
  • Follower 节点会从 Leader 拉取消息并写入自身日志,然后向 Leader 发送 ACK;
  • 当 Leader 收到所有 ISR(In Sync Replicas,同步副本集)中的 Follower 的 ACK 后,会更新 HW(High Watermark),并向 Producer 发送 ACK,表示消息已安全写入。

消息的磁盘存储

每个分区都对应一个目录,该目录保存该分区的所有消息文件和索引文件。消息以追加写入(append)的方式写入这些文件。

可以通过 Kafka 的日志保留策略(log retention)来管理消息的存储生命周期,并控制磁盘空间使用,例如基于时间或空间的删除策略。

8ZFAn6YT-3.png

如果在消息发送过程中发生故障,例如网络问题或 Leader 节点不可用,Producer 会根据配置的重试策略进行重试。但重试可能增加消息重复的风险。自 0.11 版本以来,Kafka 引入了幂等生产者(Idempotent Producer)和事务性生产者(Transactional Producer),用于降低消息重复或不一致的问题。

**幂等生产者(Idempotent Producer)**能够确保即使消息被重新发送,也不会出现数据重复。

**事务性生产者(Transactional Producer)**则保证消息发送的原子性:要么所有消息都成功发送,要么在发生错误时全部不发送。

整个过程体现了 Kafka Producer 的健壮性与灵活性。Kafka 通过序列化、分区、发送和重试机制来确保消息的可靠传输与存储。

2、Introduction to Kafka Producer Java API(Kafka Producer Java API 简介)

Kafka 的 Producer Java API 提供了一套丰富的工具和方法,使应用程序能够高效地向 Kafka 集群发布消息。

要使用 Kafka Producer,可以添加以下 Maven 依赖:

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>4.1.0</version>
</dependency>

Kafka Java Producer API 的核心组件与功能:

KafkaProducer<K, V>:

KafkaProducer<K, V> 是 Kafka 生产者客户端的核心类,用于向 Kafka 集群发布消息记录。该类为泛型,可指定 key(K) 和 value(V) 的类型。

KafkaProducer 的常用构造函数包括:

  • KafkaProducer(Map<String, Object> configs):通过配置 Map 实例化 Producer。
  • KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer):指定配置和序列化器。
  • KafkaProducer(Properties properties):使用 Properties 对象初始化 Producer。
  • KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer):结合 Properties 与自定义序列化器。

KafkaProducer 的关键方法包括:

  • abortTransaction():终止正在进行的事务。
  • beginTransaction():开启一个新的事务。
  • close():关闭 Producer。
  • close(Duration timeout):等待指定时间处理完未完成请求后关闭。
  • commitTransaction():提交事务。
  • flush():立即发送缓冲区中所有消息。
  • send(ProducerRecord<K, V> record):异步发送消息。
  • send(ProducerRecord<K, V> record, Callback callback):异步发送消息并执行回调。

ProducerRecord<K, V>:

ProducerRecord<K, V> 用于封装要发送到 Kafka 的 key/value 消息记录,包括主题名称、分区号(可选)、key 和 value。

分区选择方式如下:

  • 指定分区号时,将直接发送至该分区。
  • 未指定分区但包含 key 时,根据 key 的 hash 值选择分区。
  • 既未指定分区也无 key 时,Kafka 按轮询方式为消息分配分区。

ProducerRecord 的构造函数包括:

  • ProducerRecord(String topic, Integer partition, K key, V value):创建指定 topic 和 partition 的记录。
  • ProducerRecord(String topic, K key, V value):创建带 key 的记录。
  • ProducerRecord(String topic, V value):创建无 key 的记录。

ProducerRecord 的关键方法包括:

  • headers():获取消息头。
  • key():获取 key。
  • value():获取消息值。
  • partition():获取消息目标分区,未指定则为 null。
  • timestamp():获取时间戳。
  • topic():获取消息所在的 topic。

ProducerConfig 类提供了一系列用于设置 Kafka Producer 的配置字段。以下是一些常见的配置项:

字段(Field) 常用配置项(common configuration items) 说明(Explanation)
ACKS_CONFIG acks 控制消息确认级别。例如 “all” 表示只有所有副本都成功确认后才认为消息已写入。
BOOTSTRAP_SERVERS_CONFIG bootstrap.servers 指定 Kafka 集群地址列表,用于初始化 producer,例如 “localhost:9092”。
BUFFER_MEMORY_CONFIG buffer.memory 控制 producer 缓冲区总内存大小(字节)。
KEY_SERIALIZER_CLASS_CONFIG key.serializer 指定 key 的序列化器类,例如 “org.apache.kafka.common.serialization.StringSerializer”。
MAX_REQUEST_SIZE_CONFIG max.request.size 控制单个请求的最大大小(字节)。
PARTITIONER_CLASS_CONFIG partitioner.class 指定分区器(Partitioner)的类全名,用于决定消息的分区逻辑。
RECEIVE_BUFFER_CONFIG receive.buffer.bytes 控制网络接收缓冲区大小(字节)。
REQUEST_TIMEOUT_MS_CONFIG request.timeout.ms 控制请求的超时时间(毫秒)。
RETRIES_CONFIG retries 控制发送失败时的重试次数。
RETRY_BACKOFF_MS_CONFIG retry.backoff.ms 控制重试前等待的时间(毫秒)。
SEND_BUFFER_CONFIG send.buffer.bytes 控制网络发送缓冲区大小(字节)。
TRANSACTION_TIMEOUT_CONFIG transaction.timeout.ms 控制事务的超时时间(毫秒)。
VALUE_SERIALIZER_CLASS_CONFIG value.serializer 指定 value 的序列化器类,例如 “org.apache.kafka.common.serialization.StringSerializer”。

这些配置项共同决定 Kafka Producer 的行为,包括:

  • 消息序列化方式
  • 数据持久化保障等级
  • 网络请求的超时策略
  • 内存使用限制等

开发者应根据实际需求与部署环境合理设置这些参数。

常见 Kafka Producer 配置说明:

  • key.serializer.class:用于把消息的 key 转成字节数据 的类。Kafka 传输的都是字节,因此必须指定一个序列化器。这个类必须实现 Serializer 接口。

  • value.serializer.class:用于把消息的 value 转成字节数据 的类,同样必须实现 Serializer 接口。

  • acks:用来控制 Producer 发送消息时,需要 Kafka 返回多少确认才算成功。

    • acks=0:发完就算成功,不等确认 → 速度最快,但可能丢消息
    • acks=1:只要 Leader 写入成功就算成功 → 较快,但 Leader 崩了可能丢消息
    • acks=all 或 -1:所有同步副本都写入才算成功 → 最安全,最可靠
  • bootstrap.servers:Kafka 集群的地址列表。Producer 先连接这里获取元数据,不需要写所有 broker,只要写几个能连通的节点即可。

  • buffer.memory:Producer 内部用来缓存消息的总内存大小。如果消息生产太快超出内存限制,Producer 会阻塞,直到有空间或者超时。

  • compression.type:指定消息的压缩方式。常见选项包括:nonegzipsnappylz4zstd。压缩会在一个批次(batch)的消息上一起进行,因此 批次越大,压缩效率越高

  • max.block.ms:当 Producer 的缓冲区满了,或者还没拿到 Kafka 元数据时,send() 方法最多允许被阻塞的时间。超过这个时间就会抛出异常。

  • max.request.size:限制 Producer 单个请求的最大字节数。它控制一次最多能发送多少消息,也限制未压缩的消息批次大小。

  • request.timeout.ms:Producer 发送一个请求后,最长等待 Kafka 响应的时间。超过这个时间 Producer 会重试;重试多次后仍失败,就会抛出异常。

    设置这个值时应该大于 broker 端的 replica.lag.time.max.ms,这样可以避免因为 follower 同步慢而产生不必要的重试。

3、Using the Kafka Producer Java API(使用 Kafka Producer Java API)

使用 Kafka Producer 向主题发送消息的步骤如下:

  1. 配置 Producer:创建一个 Properties 对象,并设置必要的 Kafka Producer 配置,例如:
    • bootstrap.servers(Kafka 集群地址),
    • key.serializer 和 value.serializer(key 和 value 的序列化类名),
    • acks(消息确认级别)等。
  2. 创建 KafkaProducer 实例:使用配置好的 Properties 对象实例化 KafkaProducer 类,并指定消息 key 和 value 的类型参数。
  3. 创建消息:使用 new ProducerRecord<K, V>(topic, key, value) 创建 ProducerRecord 对象,其中 topic 是目标主题名称,key 和 value 分别为消息的键和值。
  4. 发送消息:调用 KafkaProducer 的 send() 方法发送消息。该方法是异步的,会立即返回一个 Future 对象,用于表示发送操作的完成情况。
  5. 处理回调(可选):如果需要在消息发送后获取确认信息或处理异常,可以为 send() 方法提供回调函数。当消息成功发送或发生异常时,回调将被触发。
  6. 同步发送(可选):如果需要同步方式发送消息,可以在调用 send() 后,对返回的 Future 调用 get() 方法,使发送操作变为同步阻塞,直到 Kafka 返回结果。
  7. 刷新缓冲区(可选):Kafka Producer 使用缓冲区提高发送效率,可调用 flush() 方法确保所有待发送消息立即写出,即使尚未达到 batch.size 配置的批次大小。
  8. 关闭 Producer:在发送完所有消息后,调用 KafkaProducer 的 close() 方法关闭生产者并释放相关资源。

KafkaProducer 是线程安全的。在不同线程之间共享一个 Producer 实例通常比为每个线程创建多个实例更高效。

下面是一个使用 Producer 发送消息的简单示例代码:

导入所需包:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

编写代码:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerExample {
    public static void main(String[] args) {

        // 创建配置对象,用于设置 Kafka Producer 的各项参数
        Properties props = new Properties();

        // 配置 Kafka 集群地址(可以写多个,用逗号分隔)
        // Producer 会使用这个地址建立初始连接
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");

        // 配置 Key 的序列化器——必须是字符串(ClassName)
        // Kafka 只能发送字节数组,序列化器负责把 String 转成 byte[]
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 配置 Value 的序列化器
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 创建 KafkaProducer 实例
        // <String, String> 表示 key 和 value 都是字符串
        Producer<String, String> producer = new KafkaProducer<>(props);

        try {
            // 发送 100 条消息,key 和 value 都为简单字符串
            for (int i = 0; i < 100; i++) {
                // 创建一条消息 ProducerRecord:
                // "my-topic":发送的主题
                // "key" + i:消息 key
                // "val" + i:消息 value
                ProducerRecord<String, String> record = new ProducerRecord<>("myTopic", "key" + i, "val" + i);

                producer.send(record);
            }
            // flush() 会将缓冲区中剩余消息立即发送
            producer.flush();
        } finally {
            // 关闭 producer 客户端
            producer.close();
        }
    }
}

这段代码创建了一个 Kafka Producer,向名为 “myTopic” 的主题异步发送 100 条包含整数 key 和 value 的消息。由于 acks 设置为 0,Producer 无需等待 broker 的确认。最后关闭 Producer 资源。

注意,使用 send() 发送消息是异步的。如果希望监控消息发送情况(例如成功后获取分区和位移信息,或失败后打印异常),可以调整 send() 的调用方式,传入一个匿名类或 lambda 表达式实现 Callback 接口。示例代码如下:

producer.send(new ProducerRecord<String, String>("myTopic", "key" + i, "val" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception e) {
                        if (e != null) {
                            // 处理异常
                            System.err.println("发送消息时发生错误:" + e.getMessage());
                        } else {
                            // 消息发送成功,打印消息元数据
                            System.out.println("消息发送成功:" + "key: " + record.key() + ",offset: " + metadata.offset() + ",partition: " + metadata.partition() + ",topic: " + metadata.topic());
                        }
                    }
                });

如果希望逐条确认消息是否发送成功,也可以选择同步发送。同步发送意味着线程会阻塞,直到 kafka 集群接收到并确认该消息。示例:

// 同步发送消息并等待确认
RecordMetadata metadata = producer.send(new ProducerRecord<String, String>("myTopic", "key" + i, "val" + i)).get();

在这段代码中,producer.send(record).get() 会一直阻塞到服务器返回 ACK 为止。RecordMetadata 对象包含消息的 offset、分区编号及 key 等信息。

同步发送模式可以确保每条消息都成功发送,也可以立即处理确认信息。但这种方式会降低 Producer 的性能,因为它限制了发送操作的并行度。

二、Kafka Consumer

Kafka Consumer API 是一个 Kafka 客户端程序,用于订阅主题并从 Kafka 集群中消费消息。Kafka 的 Consumer API 支持多种编程语言,包括 Java、C、Python 等,使开发者可以根据自身的技术偏好和项目需求选择合适的语言来实现消费者。不论是需要快速响应的实时系统,还是要求稳定传输的批处理系统,Kafka 消费者都能提供高效、可靠的数据流处理能力。

1、 HowKafka Consumers Work(Kafka Consumers 的工作原理)

消费者通过订阅特定的主题,从 Kafka 集群中使用或读取数据。它们会与 Kafka 集群中的各个 Broker 进行交互以获取数据。每个消费者都明确知道应当从哪个 Broker 读取数据,并会按照分区中消息的 offset 顺序读取,保证消息的有序性。
这意味着消费者只有在读取了 offset 为 0 的消息后,才能继续读取 offset 为 1 的消息,以此类推,从而保持消息顺序的一致性。

此外,消费者还具备并行从多个 Broker 读取数据的能力,从而提升读取操作的灵活性和效率。例如,在一个 Kafka 集群中,Consumer 1 可能按顺序从 Broker 1 读取数据,而 Consumer 2 可能同时按顺序从 Broker 2 和 Broker 3 读取数据。这种设计使消费者能够根据集群拓扑和负载情况有效分配读取任务,从而实现高吞吐和负载均衡。

通过这种方式,Kafka 消费者能够在保持数据完整性与顺序性的同时,从集群中高效地读取数据。

8ZFAn6YT-4.png

2、 Consumer Group

Consumer Group(消费者组) 是 Kafka 中的一个逻辑概念,表示一组消费者共同订阅并消费一组主题。组内的消费者协同工作,确保每个主题分区中的消息只会被组内的 一个消费者 消费,从而避免消息被重复处理。

在一个消费者组中,每个主题的所有分区都会被分配给该组中的物理消费者。分区与消费者之间是一种 多对一 的关系:

  • 每个分区只会被一个消费者处理(保证消息不重复消费),
  • 但一个消费者可以处理多个分区

属于同一个消费者组的消费者可以分布在不同的主机上,以分布式方式共同工作,完成消息的消费任务。当单个消费者无法实时处理生产者发送到某个主题的大量消息时,可以通过 增加消费者数量 来提升并行处理能力。

消费者组通过 group.id 来标识。在 Kafka 客户端配置中设置相同的 group.id,即可将多个消费者实例归属到同一个消费者组中。

如果创建消费者时 未指定 group.id,该消费者就被视为 独立消费者,必须自行消费所订阅主题的 所有分区。在大多数生产环境中,推荐始终为 Kafka 消费者设置 group.id,以利用消费者组带来的负载均衡、扩展能力和容错性。

例如,假设有一个名为 T1 的 Kafka 主题,并且它有 四个分区

当消费者组中只有 1 个消费者时,这个消费者将独立消费 T1 主题下所有四个分区的消息。由于组内没有其他消费者,它必须承担所有分区的消费任务,因此会处理所有负载。在高吞吐量场景中,这种情况可能会造成性能瓶颈。

8ZFAn6YT-5.png

如果消费者组中有 2 个消费者,它们会通过 Kafka 的内部分区分配机制协同消费 T1 主题的四个分区。每个消费者将负责消费其中两个分区,而具体由哪个消费者消费哪些分区,则由 Kafka 动态决定。这样的分配方式实现了基本的负载均衡,使两个消费者能够共同分担消息消费任务。

8ZFAn6YT-6.png

如果消费者组中有 5 个消费者,而主题只有 4 个分区,那么 Kafka 会将这 4 个分区分配给其中的 4 个消费者,使每个分区都有对应的消费者进行消费。剩下的第 5 个消费者则不会被分配到任何分区,也就没有消费任务。在这种情况下,资源无法被充分利用,因为有一个消费者实例处于空闲状态,可能会造成资源浪费和不必要的开销。

8ZFAn6YT-7.png

假设主题 T1 有四个分区,并且存在两个消费者组:消费者组 1 有四个消费者,消费者组 2 有两个消费者。这两个消费者组会彼此独立地消费 T1 主题的消息。每个消费者组内部的消费者会相互协作,共同消费分配给该组的分区。而不同消费者组之间互不影响,它们会并行处理消息。但需要注意的是,同一消费者组内部必须进行协调,以避免在同一组内出现对同一条消息的重复消费。

8ZFAn6YT-8.png

3、ebalancing process(再均衡过程)

在 Kafka 中,消费者组通过再均衡(Rebalancing)机制动态管理分区的归属分配。再均衡是 Kafka 消费者组的关键机制,它允许将分区的所有权从一个消费者转移到另一个消费者。当消费者组成员发生变化时,再均衡可以实现负载的重新分配,这对于保持消费者组的高可用性和可扩展性至关重要。

触发再均衡的事件包括:

  • 扩容消费者组: 新的消费者加入消费者组时。
  • 缩减消费者组: 某个消费者离开或被移除时。
  • 主题分区数量变化: 当订阅的主题新增分区时。
  • 应用崩溃: 消费者崩溃导致其原有分区需要重新分配。
  • 消费者正常关闭: 消费者关闭后,其分区需要重新分配。
  • 会话超时: 消费者在会话超时时间内未发送心跳,被视为不活跃。
  • 订阅主题列表变化: 当消费者订阅多个主题且列表发生变化时。

再均衡的基本流程:

  1. 当消费者加入一个消费者组时,它会向组协调器(Group Coordinator)发送 JoinGroup 请求。
  2. 组协调器会从所有消费者中选出一个作为 组长(Group Leader)
  3. 组长决定新的分区分配方案,将其发送给位于 broker 的组协调器,然后组协调器再把最终的分配结果通知给所有消费者。

8ZFAn6YT-9.png

影响再均衡过程的三个关键配置:

  • rebalance.backoff.ms
    在一次再均衡尝试失败后,下一次尝试前需要等待的时间(毫秒)。
    如果消费者在再均衡过程中遇到错误,会等待此时间后再尝试重新再均衡。
  • rebalance.max.retries
    指定允许的再均衡最大重试次数。如果超过该次数仍未成功,消费者将放弃再均衡并抛出 ConsumerRebalanceFailedException 异常。
    再均衡失败的可能原因包括网络问题、权限问题、消费者处理速度过慢等。
  • zookeeper.session.timeout.ms(仅 Kafka 0.10 以前版本)
    对于使用 Zookeeper 作为协调器的旧版本 Kafka,此配置定义 Zookeeper 会话超时时间。
    如果消费者在该时间内未向 Zookeeper 发送心跳,会被认为“死亡”,从而触发再均衡。
    在 Kafka 0.10 及以后版本中,已改用 Kafka 自身的 Group Coordinator,因此该参数不再使用。

再均衡的重要性与影响:

虽然分区再均衡对消费者组的稳定性和负载均衡非常重要,但在正常情况下应尽量减少再均衡的发生。这是因为在再均衡过程中,消费者组的成员会出现短暂的不可用窗口,在这段时间内它们无法读取新的消息。

通过合理的配置与监控,可以降低再均衡频率并优化其影响,从而确保 Kafka 消费者组高效、稳定地运行。

4、 Offset management(偏移量管理)

Offset(偏移量) 是 Kafka 中非常重要的概念,它表示消费者组在每个分区中的消费进度。每条消息在 Kafka 分区中都有一个从 0 开始递增的 offset。通过 offset,消费者可以定位分区中的具体消息,从而保证消息按顺序被消费。提交 offset 的方式由消费者决定,它就像书中的“书签”,Kafka 通过偏移量知道消费者读到了哪里,从而维护消费状态。

Kafka 使用以下三种投递语义来定义消息的处理方式:

  • At most once(至多一次):这是默认语义。这种模式下,消息可能会丢失但不会重复处理。消费者在接收到消息后立即提交 offset,如果消息在处理过程中出错,将不会再次处理。
  • At least once(至少一次):在这种模式下,消费者在处理完消息后才提交 offset。这意味着如果提交 offset 失败,消费者重启或再均衡后可能会再次处理同一条消息,因此可能出现重复消费。该模式保证消息不会丢失,但可能重复。
  • Exactly once(精确一次):确保每条消息只被处理一次,即使消费者或 Kafka 集群发生故障。这是最强的投递保证,但实现最复杂。Kafka 通过幂等生产者和事务来实现精确一次语义。消费者需要配合幂等或事务型生产者,确保消息不会重复处理。

如果消费者发生故障或重启,它可以从上一次提交的 offset 继续读取消息,从而实现故障恢复。Kafka 会将消费者组的 offset 存储在内部主题 __consumer_offsets 中。

默认情况下,消费者在成功消费消息后会自动提交 offset,是否允许自动提交可以通过 enable.auto.commit 配置项控制。消费者也可以选择手动提交 offset,以获得更高的控制能力。在处理完消息后,消费者可以显式调用同步提交 commitSync() 或异步提交 commitAsync() 方法来提交偏移量。

8ZFAn6YT-10.png

5、Kafka Consumer Java API Introduction(Kafka Consumer Java API 简介)

Kafka 中的消费者 API 分为高级和低级两类。大部分场景推荐使用高级消费者 API。下面介绍 Kafka Consumer Java API 的常用类与方法。

KafkaConsumer:

用于从 Kafka 集群消费消息,处理分区迁移、broker 故障,并支持消费者组。该类是 非线程安全 的。

KafkaConsumer 构造函数:

  • KafkaConsumer(Map<String, Object> configs):使用一个配置 Map 创建消费者,适合程序中直接构建配置键值对。
  • KafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer):除了配置 Map,还显式指定 key 和 value 的反序列化器。
  • KafkaConsumer(Properties properties):使用 Properties 对象创建消费者,更适合从配置文件加载配置。
  • KafkaConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer):同时使用 Properties 和自定义反序列化器创建消费者。

KafkaConsumer 常用方法:

  • seek(TopicPartition partition, long offset):设置指定分区的读取起始偏移量。
  • resume():恢复之前暂停的分区。
  • wakeup():中断消费者的阻塞 poll,常用于安全关闭消费者线程。
  • assignment():获取当前消费者被分配到的所有分区。
  • subscription():获取当前订阅的主题列表。
  • subscribe(List<String> topics, ConsumerRebalanceListener listener):订阅主题并注册再均衡回调监听器。
  • unsubscribe():取消订阅所有主题。
  • subscribe(List<String> topics):订阅主题(传空列表等同于 unsubscribe())。
  • subscribe(Pattern pattern, ConsumerRebalanceListener listener):使用正则表达式订阅主题。
  • assign(List<TopicPartition> partitions):手动分配分区,不依赖消费者组的自动分配。
  • poll(Duration timeout):轮询拉取消息,是消费者获取数据的核心方法。
  • commitSync():同步提交 offset,保证提交成功但会阻塞。

ConsumerRecord:

用于表示从 Kafka 拉取的一条消息记录,包括 topic、partition、offset、timestamp 等信息。

ConsumerRecord 构造函数:

  • ConsumerRecord(String topic, int partition, long offset, K key, V value):最基础的构造方法,适用于 Kafka 0.9。
  • ConsumerRecord(String topic, int partition, long offset, long timestamp, TimestampType timestampType, long checksum, int serializedKeySize, int serializedValueSize, K key, V value):包含时间戳及序列化大小等额外字段,适用于 Kafka 0.10。
  • ConsumerRecord(String topic, int partition, long offset, long timestamp, TimestampType timestampType, Long checksum, int serializedKeySize, int serializedValueSize, K key, V value, Headers headers):增加 headers 信息,用于消息头读取。
  • ConsumerRecord(String topic, int partition, long offset, long timestamp, TimestampType timestampType, Long checksum, int serializedKeySize, int serializedValueSize, K key, V value, Headers headers, Optional<Integer> leaderEpoch):包含 leaderEpoch 等更多元数据,为更高版本 Kafka 提供完整信息。

ConsumerRecord 常用方法:

  • headers():获取消息头部。
  • key():获取消息 key(可能为 null)。
  • leaderEpoch():获取该消息属于的 leader 任期。
  • offset():获取消息偏移量。
  • partition():获取分区 ID。
  • serializedKeySize():获取序列化后 key 的大小。
  • serializedValueSize():获取序列化后 value 的大小。
  • timestamp():获取消息时间戳。
  • timestampType():获取时间戳类型(如 CREATE_TIME 或 LOG_APPEND_TIME)。
  • value():获取消息内容。

ConsumerRecords:

用于存储一次 poll() 调用返回的所有记录批次,按分区进行组织。

ConsumerRecords 构造函数:

  • ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records):使用一个分区到消息列表的 Map 创建 ConsumerRecords 实例。

ConsumerRecords 常用方法:

  • count():返回消息总数。
  • partitions():返回包含数据的分区集合。
  • iterator():以迭代器方式访问所有消息。
  • records(TopicPartition partition):获取指定分区的消息列表。

下面是 ConsumerConfig 类中常见消费者配置项说明:

字段(Field) 常用配置项(common configuration items) 说明(Explanation)
ALLOW_AUTO_CREATE_TOPICS_CONFIG allow.auto.create.topics 是否允许在访问不存在的主题时自动创建主题。
AUTO_COMMIT_INTERVAL_MS_CONFIG auto.commit.interval.ms 自动提交 offset 的时间间隔(毫秒)。
AUTO_OFFSET_RESET_CONFIG auto.offset.reset 当没有可用 offset 时消费者的行为,如 earliest / latest。
BOOTSTRAP_SERVERS_CONFIG bootstrap.servers Kafka 集群的 broker 地址列表。
CLIENT_ID_CONFIG client.id 客户端 ID,用于日志与请求追踪。
CONNECTIONS_MAX_IDLE_MS_CONFIG connections.max.idle.ms Kafka 连接的最大空闲时间。
DEFAULT_FETCH_MAX_BYTES (默认配置项) 默认每个分区可获取的最大字节数。
DEFAULT_ISOLATION_LEVEL (默认配置项) 消费者处理事务消息时的隔离级别。
DEFAULT_MAX_PARTITION_FETCH_BYTES (默认配置项) 每个分区返回的最大数据量(字节)。
ENABLE_AUTO_COMMIT_CONFIG enable.auto.commit 是否启用自动提交 offset。
FETCH_MAX_BYTES_CONFIG fetch.max.bytes 一次从服务端获取的最大数据量(字节)。
FETCH_MAX_WAIT_MS_CONFIG fetch.max.wait.ms poll 请求等待数据的最长时间。
FETCH_MIN_BYTES_CONFIG fetch.min.bytes poll 请求最少返回的数据字节数。
GROUP_ID_CONFIG group.id 消费者组 ID。
GROUP_INSTANCE_ID_CONFIG group.instance.id 消费者实例 ID(用于静态成员)。
HEARTBEAT_INTERVAL_MS_CONFIG heartbeat.interval.ms 消费者发送心跳的时间间隔。
INTERCEPTOR_CLASSES_CONFIG interceptor.classes 拦截器类列表。
ISOLATION_LEVEL_CONFIG isolation.level 事务隔离级别(read_committed 或 read_uncommitted)。
KEY_DESERIALIZER_CLASS_CONFIG key.deserializer key 的反序列化类。
MAX_PARTITION_FETCH_BYTES_CONFIG max.partition.fetch.bytes 每个分区返回数据的最大字节数。
MAX_POLL_INTERVAL_MS_CONFIG max.poll.interval.ms 两次 poll 调用之间允许的最大间隔。
MAX_POLL_RECORDS_CONFIG max.poll.records 单次 poll 返回的最大记录数。
PARTITION_ASSIGNMENT_STRATEGY_CONFIG partition.assignment.strategy 分区分配策略(RoundRobin、Range 等)。
RECEIVE_BUFFER_CONFIG receive.buffer.bytes socket 接收缓冲区大小。
RECONNECT_BACKOFF_MAX_MS_CONFIG reconnect.backoff.max.ms 重连尝试的最大退避时间。
RECONNECT_BACKOFF_MS_CONFIG reconnect.backoff.ms 重连尝试的基础退避时间。
REQUEST_TIMEOUT_MS_CONFIG request.timeout.ms 请求超时时间。
RETRY_BACKOFF_MS_CONFIG retry.backoff.ms 重试请求的退避时间。
SEND_BUFFER_CONFIG send.buffer.bytes socket 发送缓冲区大小。
SESSION_TIMEOUT_MS_CONFIG session.timeout.ms 会话超时时间,超过此时间消费者会被认为离线。
VALUE_DESERIALIZER_CLASS_CONFIG value.deserializer value 的反序列化类。

TopicPartition:

TopicPartition 类表示 主题 + 分区 的组合,用于定位消息所在的位置。包路径:org.apache.kafka.common.TopicPartition

TopicPartition 方法:

  • partition():获取分区编号
  • topic():获取主题名称

6、 Using the Kafka Consumer Java API(使用 Kafka Consumer Java API)

使用高级 Kafka 消费者订阅主题的主要步骤如下:

  1. 创建消费者配置:首先创建一个 Properties 对象,并设置消费者的配置项,至少要包含 bootstrap.servers、group.id、key.deserializer、value.deserializer 等。
  2. 订阅主题列表:使用 subscribe() 方法,并传入一个包含一个或多个主题名称的列表来订阅这些主题。调用该方法后,消费者将开始接收来自这些主题的消息。
  3. 轮询消息:调用 poll() 方法拉取消息。该方法会阻塞,直到有新消息到达或超出指定的超时时间。poll() 返回的 ConsumerRecords 对象包含所有已订阅主题的消息。
  4. 处理消息:遍历 ConsumerRecords 对象并处理每一条记录。可通过 record.topic()record.partition()record.value() 等方法获取消息的元数据和内容。
  5. 提交 offset(可选):消息处理完成后,offset 默认会自动提交。也可以选择手动提交 offset,以更精确地控制消费进度。如果需要手动提交,可以调用 commitSync() 进行同步提交,或调用 commitAsync() 进行异步提交。
  6. 关闭消费者:完成消息消费后,调用 close() 方法关闭消费者并释放资源。

以下是一个使用 Kafka Java Consumer API 的简单示例。首先导入所需包:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.*;

下面是一个完整的 Kafka Java Consumer 示例程序:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

public class ConsumerExample {
    public static void main(String[] args) {

        // 创建消费者配置对象,并为消费者设置必要的参数
        // bootstrap.servers:指定 Kafka 集群地址
        // group.id:消费者所属的消费者组
        // key.deserializer / value.deserializer:指定如何把消息反序列化成 Java 对象
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "myConsumerGroup");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 根据配置创建 KafkaConsumer 实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        // 这里订阅单个主题 “myTopic”
        // subscribe() 会让消费者在消费者组内参与分区分配
        consumer.subscribe(Collections.singletonList("myTopic"));

        try {
            // 不断地从 Kafka 拉取消息
            // poll() 方法会从服务器拉取消息,如果没有消息就阻塞直到超时
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

                // 遍历 poll() 返回的所有消息并进行处理
                for (ConsumerRecord<String, String> record : records) {
                    // 打印消息的主题、分区、offset 和消息内容
                    System.out.printf("收到消息:topic %s, partition %d, offset %d: %s%n", record.topic(), record.partition(), record.offset(), record.value());
                }
            }
        } finally {
            // 程序结束时关闭消费者,释放资源
            consumer.close();
        }
    }
}

下面是更多订阅方式,可根据需求替换上面代码中的订阅部分。

订阅多个主题:

String[] topics = {"myTopic1", "myTopic2", "myTopic3"};
consumer.subscribe(Arrays.asList(topics));

使用正则表达式订阅匹配的主题(动态分配分区):

consumer.subscribe(Pattern.compile("myTopic*"));

7、Idempotency and transactions(幂等性与事务)

在 Kafka 中,幂等性指的是生产者在重试发送消息时,即使发送多次,Kafka 也只会保存一份,不会出现重复消息。从 Kafka 0.11 开始,Kafka 自带幂等性功能,只需简单配置即可开启。

启用幂等性后,即使发生网络异常、超时,或者 Kafka 要求生产者重发消息,都不会产生重复数据。Kafka 会自动保证同一条消息只写一次。

不过,需要注意的是:

  • 幂等性只能保证生产者在当前程序运行期间不会重复写入
  • 幂等性只能在单个分区内保证精确一次
  • 并不能处理跨分区、跨会话(程序重启)的重复问题

如果业务场景需要保证“一批消息要么全部写成功,要么全部失败”,或者需要保证跨分区的精确一次语义,那么仅靠幂等性不够,需要使用 Kafka 的事务机制

Kafka 的事务机制可以把一批消息的写入看成一个整体,就像数据库的事务一样,可以做到:

  • 全成功
  • 全失败
  • 不会出现部分成功、部分失败的情况

事务常见的用途包括:

  • 生产者一次发送多条消息,必须全部成功
  • Consumer → 处理消息 → Producer 的模式,必须保证消费 offset 和生产的新消息同时成功,否则会造成重复消费或重复写入

事务使用步骤如下:

  • 设置唯一的事务 ID,用于标识生产者实例
  • 初始化事务
  • 开启事务
  • 发送一批消息
  • 提交事务
  • 发生异常时回滚事务

下面给出两个幂等性与事务性的代码。

幂等性 Producer 示例:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class IdempotentProducerExample {

    public static void main(String[] args) {

        // 创建配置对象,设置 Kafka Producer 的各种参数
        Properties props = new Properties();

        // Kafka 集群地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");

        // key 和 value 的序列化器
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 启用幂等性,保证消息在重试时不会重复
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

        // 要求所有副本确认,保证更高的数据可靠性
        props.put(ProducerConfig.ACKS_CONFIG, "all");

        // 创建 Producer 实例
        Producer<String, String> producer = new KafkaProducer<>(props);

        try {
            // 连续发送 10 条消息
            for (int i = 0; i < 10; i++) {

                // 创建消息对象
                ProducerRecord<String, String> record = new ProducerRecord<>("myTopic", "key" + i, "value" + i);

                // 异步发送消息,并添加回调处理结果
                producer.send(record, (metadata, e) -> {
                    if (e != null) {
                        System.err.println("发送失败:" + e.getMessage());
                    } else {
                        System.out.printf("发送成功:主题=%s 分区=%d offset=%d key=%s%n", metadata.topic(), metadata.partition(), metadata.offset(), record.key());
                    }
                });
            }

            // 将缓冲区中的数据强制发送出去
            producer.flush();

        } finally {
            // 关闭 Producer
            producer.close();
        }
    }
}

事务性 Producer 示例:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class TransactionalProducerExample {

    public static void main(String[] args) {

        // 创建配置对象
        Properties props = new Properties();

        // Kafka 地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");

        // 序列化器
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 设置事务 ID,必须唯一
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transaction-id");

        // 创建事务性 Producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        try {
            // 初始化事务,第一次使用事务必须调用
            producer.initTransactions();

            // 开始事务
            producer.beginTransaction();

            // 发送一批消息
            for (int i = 0; i < 5; i++) {
                ProducerRecord<String, String> record = new ProducerRecord<>("myTopic", "txKey" + i, "txValue" + i);

                producer.send(record);
            }

            // 全部成功后提交事务
            producer.commitTransaction();

        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {

            // 这些是无法恢复的异常,只能关闭 Producer
            producer.close();

        } catch (Exception e) {

            // 其他异常,回滚事务
            producer.abortTransaction();
            producer.close();
        }
    }
}

三、Kafka's Cluster Management and Monitoring(Kafka 的集群管理与监控)

本节将介绍 Kafka 集群的管理与监控,这对于保持 Kafka 环境的稳定与高效至关重要。Kafka 提供了一系列命令行工具,可以方便地管理集群中的主题和消费者组,并且无需停止集群就能修改配置。

1、Topic management(Topic 管理)

在对 Kafka 的主题进行操作之前,需要保证 Kafka 集群已经正常启动并运行。集群启动后,就可以在其中创建主题。Kafka 提供了命令行工具,可以直接在服务器上创建、查看、修改和删除主题。

例如,下面的命令可以创建一个名为 my-topic 的主题,并为其设置一个分区和一个副本:

bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic my-topic --partitions 1 --replication-factor 1

输出:

Created topic my-topic.

命令参数说明如下:

  • --bootstrap-server:指定 Kafka Broker 地址
  • --create:表示要创建主题
  • --topic:主题名称
  • --partitions:主题的分区数量
  • --replication-factor:每个分区的副本数量

执行成功后,Kafka 会在集群中创建对应的主题。

如果需要查看某个主题的详细信息,可以使用以下命令:

bin/kafka-topics.sh --bootstrap-server node1:9092 --describe --topic my-topic

输出会显示该主题的分区数量、副本因子、Leader 节点、所有副本列表以及当前同步副本(ISR)列表,例如:

Topic: my-topic TopicId: hBcHIKO9RxaGdbMfuOZDeQ PartitionCount: 1       ReplicationFactor: 1    Configs:
Topic: my-topic Partition: 0    Leader: 0       Replicas: 0     Isr: 0

字段含义如下:

  • PartitionCount:主题拥有的分区数
  • ReplicationFactor:每个分区的副本数
  • Replicas:该分区的所有副本所在的 Broker
  • Isr:当前处于同步状态的副本

如果想为已有主题增加分区,可以执行:

bin/kafka-topics.sh --bootstrap-server node1:9092 --alter --topic my-topic --partitions 2

再次查看主题信息时,可以看到分区数量已经从 1 扩展到 2。需要注意的是,Kafka 不允许减少主题的分区数量,否则会导致 InvalidPartitionsException 错误。

Topic: my-topic TopicId: hBcHIKO9RxaGdbMfuOZDeQ PartitionCount: 2       ReplicationFactor: 1    Configs:
Topic: my-topic Partition: 0    Leader: 0       Replicas: 0     Isr: 0
Topic: my-topic Partition: 1    Leader: 0       Replicas: 0     Isr: 0

删除主题可以使用以下命令:

bin/kafka-topics.sh --bootstrap-server node1:9092 --delete --topic my-topic

默认情况下,删除操作会将主题标记为“待删除”。如果希望真正删除主题,需要确保在配置文件 server.properties 中开启以下配置:

delete.topic.enable=true

要查看 Kafka 集群中当前所有可用的主题,可以执行:

bin/kafka-topics.sh --bootstrap-server node1:9092 --list

通过这些命令,即可完成对 Kafka 主题的创建、查看、修改以及删除等常规管理操作。

2、Consumer Group Management(Consumer Group 管理)

Kafka 提供了 kafka-consumer-groups.sh 命令行工具,可以用它来列出、查看或删除消费者组。消费者组可以由系统自动删除,也可以由用户手动删除。当某个消费者组的所有成员都不再活跃,并且该组最后提交的 offset 已经过期时,Kafka 会自动清除该组。手动删除只有在该消费者组没有活跃成员时才有效。

如果使用的是旧版本 Kafka 的消费者客户端,可以使用 --zookeeper 和 --list 选项来列出消费者组;而新版客户端已经移除了 --zookeeper,因此需要使用 --bootstrap-server 和 --list 选项来查看消费者组。

示例:列出所有消费者组

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

示例输出可能如下:

consumer-group-1
consumer-group-2
consumer-group-3

其中,--bootstrap-server 参数必须指定一个 Kafka Broker 地址,格式为 host:port。

如果希望查看某个具体消费者组的详细信息(包括 offset 信息),可以将 --list 替换为 --describe,并使用 --group 指定消费者组:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group

示例输出内容包括:

  • 当前消费 offset
  • 最新消息的 offset
  • lag(落后消息数量)
  • 消费者 ID、所属主机、客户端 ID 等

例如:

TOPIC   PARTITION   CURRENT-OFFSET   LOG-END-OFFSET   LAG   CONSUMER-ID   HOST           CLIENTID
topic1  0           854144           855809           1665  consumer1...  /127.0.0.1     consumer1
topic2  0           460537           803290           342753 consumer1... /127.0.0.1     consumer1
topic3  2           243655           398812           155157 consumer4... /127.0.0.1     consumer4

如果想手动删除一个或多个消费者组,可以使用 --delete 选项:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group

如果成功,会显示类似输出:

Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.

但是,如果某个消费者组仍然有成员存活,则会出现 GroupNotEmptyException 错误,此时无法删除。

除了查看 offset 之外,还可以使用 --delete-offsets 选项来删除消费者组对某些主题的 offset。该选项支持一个消费者组和多个主题。例如,从名为 my-group 的消费者组中删除 my-topic-1 和 my-topic-2 的 offset:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete-offsets --group my-group --topic my-topic-1 --topic my-topic-2

示例输出:

TOPIC        PARTITION   STATUS
my-topic-1   0           Successful
my-topic-2   0           Successful

Kafka 还支持重置消费者组的 offset。当需要重新消费消息,或当消费者在处理消息时出现问题导致需要将 offset 向前或向后调整时,这个功能非常有用。

可以使用 --reset-offsets 选项来重置 offset。需要注意,重置 offset 时只能同时操作一个消费者组,并且必须通过 --topic 或 --all-topics 指定范围,除非使用 --from-file 从文件导入。

例如,将某消费者组在 topic1 上的 offset 重置为最新值:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumer-group-1 --topic topic1 --to-latest

常用的 offset 重置方式包括:

  • --to-earliest:重置到最早的 offset
  • --to-latest:重置到最新的 offset
  • --to-current:重置到当前 offset
  • --to-offset <值>:重置到指定 offset
  • --shift-by <正负值>:相对于当前 offset 前移或后移指定数量
  • --to-datetime <时间>:重置到指定时间点的 offset,格式为 YYYY-MM-DDTHH:mm:SS.sss
  • --by-duration <时长>:基于当前时间的偏移,例如 P1DT2H(1天2小时)
  • --from-file:从 CSV 文件导入 offset 重置规则

这些工具可以帮助管理员方便地管理、删除及重置消费者组的 offset,从而更灵活地控制消息消费进度。

3、Dynamically Modify Server Configuration(动态修改服务器配置)

当我们需要修改 Kafka Broker 的 server.properties 配置文件时,通常需要停止 Broker、修改配置并重新启动。这种方式在生产环境中并不理想,因为它会导致服务中断。
为了解决这一问题,从 Kafka 1.1.0 开始,Kafka 支持动态修改 Broker 配置,即无需重启 Broker,就能让新的配置生效。

Kafka 提供了 kafka-configs.sh 工具来完成这些动态配置修改。通过这个工具设置的参数会持久化存储在 Kafka 的元数据系统中(现代 Kafka 已不再使用 Zookeeper)。

Kafka 中有许多与主题相关的配置项,这些配置可以为集群中的不同主题分别指定不同的参数,从而满足不同业务需求。如果不为某个主题显式设置配置,Kafka 会使用 Broker 层的默认配置。

修改主题配置的基本命令格式如下:

bin/kafka-configs.sh --bootstrap-server node1:9092 --alter --entity-type topics --entity-name <topic-name> --add-config key=value[,key=value...]

以下是一些常见的主题动态配置:

  • cleanup.policy:如果设置为 compact,则表示只保留每个 key 的最新一条消息,其余旧消息会被清理(日志压缩功能)
  • compression.type:Broker 写入磁盘时使用的压缩方式,例如 gzip、snappy、lz4 等。
  • delete.retention.ms:标记删除后的日志保留时间。
  • file.delete.delay.ms:删除主题日志段文件之前等待的时间(毫秒)。
  • flush.messages:写入多少条消息后强制 flush 到磁盘。
  • flush.ms:经过多少毫秒强制 flush 到磁盘。
  • index.interval.bytes:两个索引条目之间允许的最大字节数。
  • max.message.bytes:主题中单条消息允许的最大大小。
  • retention.bytes:该主题最多保留多少字节的数据。
  • retention.ms:消息在此主题中的最大存储时间(毫秒)。

例如,将 my-topic 主题的消息保留时间设置为 1 小时(3600000 毫秒):

bin/kafka-configs.sh --bootstrap-server node1:9092 --alter --entity-type topics --entity-name my-topic --add-config retention.ms=3600000

输出类似:

Completed updating config for entity: topic 'my-topic'.

Kafka 允许客户端覆盖的配置只有两类,与生产者和消费者的限速(Quota)相关,即允许某个 client.id 每秒向 Broker 发送或从 Broker 拉取的最大数据量(字节)。

例如,如果一个集群中有 5 个 Broker,并给某个 client.id 设置了 10MB/s 的配额,那么它可以在每个 Broker 上都使用 10MB/s,总共最高 50MB/s。

修改客户端限速配置的命令格式如下:

bin/kafka-configs.sh --bootstrap-server node1:9092 --alter --entity-type clients --entity-name <client-id> --add-config key=value[,key=value...]

支持的客户端限速配置如下:

  • producer_bytes_rate:单个 client.id 每秒允许向单个 Broker 发送的最大字节数,用于限制发送速率。
  • consumer_bytes_rate:单个 client.id 每秒允许从单个 Broker 消费的最大字节数,用于限制消费速率。

可以通过 kafka-configs.sh 查看某个主题或客户端的动态配置。使用 --describe 选项即可显示所有被覆盖的配置。例如,查看名为 my-topic 的主题的配置覆盖项:

bin/kafka-configs.sh --bootstrap-server node1:9092 --describe --entity-type topics --entity-name my-topic

示例输出:

Configs for topic 'my-topic' are retention.ms=3600000

如果需要删除动态配置,使其恢复为默认值,可以使用 --delete-config。例如,删除 my-topic 的 retention.ms 配置:

bin/kafka-configs.sh --bootstrap-server node1:9092 --alter --entity-type topics --entity-name my-topic --delete-config retention.ms

输出类似:

Completed updating config for entity: topic 'my-topic'.

删除后,该配置项会重新使用集群默认值。


评论