1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| @Slf4j public class MyOffsetCommitConsumer { public static void main(String[] args) { Properties conf = new Properties(); conf.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092;kafka2:9093;kafka3:9094"); conf.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); conf.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); conf.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); conf.put(ConsumerConfig.GROUP_ID_CONFIG, "my_group_1"); conf.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(conf); consumer.subscribe(Collections.singletonList("mytopic")); while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { log.info("consumerRecord {} {} {}", consumerRecord.key(), consumerRecord.offset(), consumerRecord.value()); } consumer.commitAsync((offsets, e) -> { if (e != null) { log.error("offset commit error",e); } }); } } }
|