本文浅谈Kafka在实践过程中如何保证不丢失消息
生产者侧
从生产者侧角度出发通过获取消息发送的结果,来避免由于生产者发送失败而造成的消息丢失。在下面生产者发送消息的几种实现方式中,sendMsg1方法即是直接发送消息;而sendMsg2、sendMsg3则是分别通过同步阻塞、异步回调地方式来获取生产者发送消息的结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| @Component @Slf4j public class MyProducer { @Autowired private KafkaTemplate<String, String> kafkaTemplate;
public void sendMsg1(String topic, String key, String value) { try { kafkaTemplate.send(topic, key, value); }catch (Exception e) { log.error("Send Non OK, Exception: {}", e.getMessage()); } }
public void sendMsg2(String topic, String key, String value) { try{ ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topic, key, value); SendResult<String, String> sendResult = listenableFuture.get(); ProducerRecord producerRecord = sendResult.getProducerRecord(); RecordMetadata recordMetadata = sendResult.getRecordMetadata(); log.info("Send OK"); }catch (Exception e) { log.error("Send Non OK, Exception: {}", JSON.toJSONString(e)); } }
public void sendMsg3(String topic, String key, String value) { try{ ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topic, key, value); listenableFuture.addCallback( sendResult -> { ProducerRecord producerRecord = sendResult.getProducerRecord(); RecordMetadata recordMetadata = sendResult.getRecordMetadata(); log.info("Send OK"); }, throwable -> { log.error("Send Non OK, Exception: {}", throwable.getMessage()); } ); }catch (Exception e) { log.error("Send Non OK, Exception: {}", JSON.toJSONString(e)); } }
}
|
而在KafkaTemplate中有一个ProducerListener属性,可用于为KafkaTemplate实例设置全局的统一的处理发送结果回调方法,这样即使通过上文的sendMsg1方法进行消息发送,也可以获取相应发送结果。实现方式也可很简单,只需提供一个ProducerListener的实现类即可,如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Slf4j @Component public class KafkaSendResultHandler implements ProducerListener {
@Override public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) { log.info("Send OK"); }
@Override public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) { log.error("Send Non OK, Exception: {}", exception.getMessage()); } }
|
Broker侧
主题副本数
Kafka可为主题设置副本数量,其作用于该主题下的各分区。如下实例代码即创建了一个名为topic_alarm_in的主题,其使用4个分区。每个分区有3份数据副本,其中1个副本为Leader副本、剩余2个为Follower副本。通过使用多副本机制以避免Kafka丢失数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Configuration public class TopicConfig {
public static final String TOPIC_ALARM_IN = "topic_alarm_in";
@Bean public NewTopic topic1() { return TopicBuilder.name(TOPIC_ALARM_IN) .partitions(4) .replicas(3) .config("min.insync.replicas", "2") .build(); } }
|
重试次数
事实上对于生产者而言,当消息发送失败后Kafka会自动进行重试。具体地,可通过生产者的 retries 参数设置Kafka发送失败时自动重试的次数
应答机制
对于生产者而言Kafka还提供了一个应答机制,用于控制生产者发送消息的持久性。具体地,可通过生产者的 acks参数进行控制。其可选值及含义如下所示
- 0: 此时生产者根本不会等待来自Broker的任何确认。消息会立即添加到Socket Buffer中并视为已发送
- 1: 此时意味当消息只要被写入Leader副本的本地日志后,即视为已发送。而不会等待其他所有的Follower副本确认。此种场景下,当Leader副本刚刚确认消息发送成功,但其他Follower副本还未来得及复制同步时,该Leader副本发生宕机即会造成消息丢失
- all: 其与将acks设置为-1的意义等价。Leader副本会等待所有Follower副本的结果,只有ISR(In-Sync Replicas)中所有副本都确认完毕,才会视为发送成功。这样只要该ISR中尚有一个副本存活、未宕机,即可保证消息不会丢失
ISR最小数量
当生产者的acks参数设置为all后, 生产者发送的消息不仅需要得到ISR中全部副本的确认,还需要满足min.insync.replicas ISR最小数量参数的要求,才会视为消息发送成功。例如上文的topic_alarm_in主题配置了副本数为3,但某分区的一个副本所在Broker节点发生宕机后,则该分区的实际副本即从3变为2了,即此时该分区的ISR调整为2。此时生产者发送一条消息到该分区,显然这里生产者acks参数设置为all了。假设现在该分区全部的两个副本均确认了,如果min.insync.replicas参数配置为2,则不会有任何问题,消息发送成功;但如果min.insync.replicas参数配置为3,虽然ISR中副本全部确认了,但由于只有两个副本未达到min.insync.replicas参数所要求的三个副本。故消息发送失败,一方面Broker不会存储该消息;另一方面,生产者会收到NotEnoughReplicasException异常
配置示例
上文retries、acks参数的配置示例如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| server: port: 8069
spring: kafka: bootstrap-servers: - 192.168.19.2:8001 - 192.168.19.2:8002 - 192.168.19.2:8003 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer retries: 5 acks: -1
|
消费者侧
消费者侧默认使用自动提交机制,其会在后台定时自动提交offset偏移量。具体地,可通过enable.auto.commit、auto.commit.interval.ms参数分别设置是否启用自动提交、自动提交的频率。下面即是一个SpringBoot下消费者关于自动提交机制的配置示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| server: port: 8069
spring: kafka: bootstrap-servers: - 192.168.19.2:8001 - 192.168.19.2:8002 - 192.168.19.2:8003 consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer enable-auto-commit: true auto-commit-interval: 996
|
事实上自动提交机制也会导致消息丢失的问题。例如当消费者侧刚刚接收到消息后,如果后台线程恰好在此时提交了offset偏移量。在消费者侧还未来得及对此条消息进行业务处理时,消费者服务宕机了。可以看到此时Kafka认为该条消息已经被消费了,但消费者服务事实上并未完成对该条消息的业务处理。解决方案也很简单,关闭自动提交,转而使用手动提交
首先在配置文件中关闭自动提交
1 2 3 4 5 6
| spring: kafka: consumer: enable-auto-commit: false
|
然后通过Java配置类自定义ConsumerFactory、ConcurrentKafkaListenerContainerFactory实例,如下所示。其中对于ConcurrentKafkaListenerContainerFactory实例而言,将ackMode设置为MANUAL_IMMEDIATE。进一步地对于手动提交而言,其存在两种方式:同步提交、异步提交。前者是阻塞式的;而后者由于是异步提交,故不会阻塞当前服务。但如果提交失败了是不会进行重试的。这里我们将syncCommits设置为true,即使用同步提交方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Configuration @EnableConfigurationProperties(KafkaProperties.class) public class KafkaConfig {
@Autowired private KafkaProperties kafkaProperties;
private Map<String, Object> consumerProperties(){ Map<String, Object> props = new HashMap<>( kafkaProperties.buildConsumerProperties() ); return props; }
@Bean public DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) { return new DefaultKafkaConsumerFactory<>( consumerProperties() ); }
@Bean("manualListenerContainerFactory") ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory consumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory( consumerFactory ); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); factory.getContainerProperties( ).setSyncCommits(true); return factory; } }
|
然后在消费者处理方法上添加@KafkaListener注解,通过containerFactory属性设置ConcurrentKafkaListenerContainerFactory实例的名称。并在业务处理完消息后通过Acknowledgment实例的acknowledge方法实现提交offset
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Component public class MyConsumer {
@KafkaListener(topics=TOPIC_ALARM_IN, groupId="myGroup1", containerFactory = "manualListenerContainerFactory") public void handle(ConsumerRecord<String, String> record, Acknowledgment ack) { AlarmIn alarmIn = JSON.parseObject(record.value(), AlarmIn.class); int index = record.partition(); long offset = record.offset(); System.out.println("[myGroup1] <c1>: alarmIn: " + alarmIn + ", partition: " + index +", offset: " + offset);
ack.acknowledge(); } }
|
参考文献
- Kafka权威指南 Neha Narkhede、Gwen Shapira、Todd Palino著