消费者

屏幕截图 2020-08-21 133318

分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡

消费者通过向被指派为群组协调器的 broker(不同的群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系

Properties props = new Properties();
//kafka 集群,broker-list
props.put("bootstrap.servers", "172.24.211.140:9092");
props.put("group.id", "consumer1");
props.put("key.deserializer",
        "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
        "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
                        String>(props);
consumer.subscribe(List.of("test"));
while(true){
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.err.println(record);
    }
}

消费方式:

采用 pull(拉)模式从 broker 中读取数据

相较于服务器 push 模式,pull模式使消费者可以根据自己的情况来决定消费的速率,另一方面,pull模式每次也可以批量拉取数据,提高服务器吞吐量,而pull模式一旦有消息产生,就会立马推送,这意味着网络开销会比pull模式高一些。

但pull模式也有一些缺陷,如果没有消息的话,客户端就会在那里空转等待,所以Kafka为消费者的poll方法指定了一个时间参数,避免空转浪费CPU资源

配置

偏移量

每一个消费者组都会对有进行消费的主题分区的偏移量进行记录,这也就意味着消费者可以手动设置这个偏移量从头消费数据。

更新分区当前偏移量的操作叫作提交

Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中,从 0.9 版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic为__consumer_offsets。

自动提交:

手动提交:

把 auto.commit.offset 设为 false ,使用 commitSync()

异步提交 commitAsync() , 但该方法在发生错误时不会进行重试

再均衡监听:

订阅的时候传入 ConsumerRebalanceListener 实现相关接口

从特定偏移量开始处理:seekToBeginning(..)

读取特定偏移量:seek(..)

Kafka 消费异常没有手动提交导致的问题

假设本次偏移量消费到28 抛出异常 没有手动提交偏移量

该消费者下次迭代也会从29开始消费 为解决这个问题 所以只能新起消费者来替代此消费者

退出

其他线程调用consumer.wakeup() 会使consumer在poll抛出异常 然后进行close即可

没有群组的消费者

调用assign为其设置消费的分区