[Kafka]ReConsume

kafka預設保存7天(168 hour)的log在disk,處理message過程中可能會出現異常或非預期錯誤(如網路中斷、disk 問題),

這時有可能造成我們資料遺失或不一致,這篇來看看如何重新consume這些資料。

kafka要重新consume某個topic資料,須指定partition和 offset即可,下面簡單示範

目前該consumer group沒有任何Lag。

 

查看該topic的offset 最大值 (-2 最小值)

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.17.0.3:9092 -topic topic --time -1

Partition 0 :最大33

 

透過kafka-console-consumer.sh 指定從offset 30 開始再次消費(假設我們從30後就沒有consume資料)

bin/kafka-console-consumer.sh --bootstrap-server 172.17.0.3:9092 --topic topic --consumer.config config/consumer.properties --offset 30 --partition 0

 

參考

kafka ops tools

Kafka-2.11学习笔记(二)Shell脚本介绍

Kafka - Rewind Consumer Offsets