Kafka之不丢消息

本文浅谈Kafka在实践过程中如何保证不丢失消息

abstract.png

生产者侧

从生产者侧角度出发通过获取消息发送的结果,来避免由于生产者发送失败而造成的消息丢失。在下面生产者发送消息的几种实现方式中,sendMsg1方法即是直接发送消息;而sendMsg2、sendMsg3则是分别通过同步阻塞、异步回调地方式来获取生产者发送消息的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Component
@Slf4j
public class MyProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;

public void sendMsg1(String topic, String key, String value) {
try {
kafkaTemplate.send(topic, key, value);
}catch (Exception e) {
log.error("Send Non OK, Exception: {}", e.getMessage());
}
}

public void sendMsg2(String topic, String key, String value) {
try{
ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topic, key, value);
// 同步阻塞式获取发送结果
SendResult<String, String> sendResult = listenableFuture.get();
ProducerRecord producerRecord = sendResult.getProducerRecord();
RecordMetadata recordMetadata = sendResult.getRecordMetadata();
log.info("Send OK");
}catch (Exception e) {
log.error("Send Non OK, Exception: {}", JSON.toJSONString(e));
}
}

public void sendMsg3(String topic, String key, String value) {
try{
ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topic, key, value);
// 自定义处理发送结果的回调函数
listenableFuture.addCallback(
sendResult -> {
ProducerRecord producerRecord = sendResult.getProducerRecord();
RecordMetadata recordMetadata = sendResult.getRecordMetadata();
log.info("Send OK");
},
throwable -> {
log.error("Send Non OK, Exception: {}", throwable.getMessage());
}
);
}catch (Exception e) {
log.error("Send Non OK, Exception: {}", JSON.toJSONString(e));
}
}

}

而在KafkaTemplate中有一个ProducerListener属性,可用于为KafkaTemplate实例设置全局的统一的处理发送结果回调方法,这样即使通过上文的sendMsg1方法进行消息发送,也可以获取相应发送结果。实现方式也可很简单,只需提供一个ProducerListener的实现类即可,如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Slf4j
@Component
public class KafkaSendResultHandler implements ProducerListener {

@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
log.info("Send OK");
}

@Override
public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
log.error("Send Non OK, Exception: {}", exception.getMessage());
}
}

Broker侧

主题副本数

Kafka可为主题设置副本数量,其作用于该主题下的各分区。如下实例代码即创建了一个名为topic_alarm_in的主题,其使用4个分区。每个分区有3份数据副本,其中1个副本为Leader副本、剩余2个为Follower副本。通过使用多副本机制以避免Kafka丢失数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Configuration
public class TopicConfig {

public static final String TOPIC_ALARM_IN = "topic_alarm_in";

@Bean
public NewTopic topic1() {
return TopicBuilder.name(TOPIC_ALARM_IN)
// 分区数
.partitions(4)
// 副本数 (包含Leader副本、Follower副本)
.replicas(3)
// ISR中数据副本(含Leader副本)的最小数量要求
.config("min.insync.replicas", "2")
.build();
}

}

重试次数

事实上对于生产者而言,当消息发送失败后Kafka会自动进行重试。具体地,可通过生产者的 retries 参数设置Kafka发送失败时自动重试的次数

应答机制

对于生产者而言Kafka还提供了一个应答机制,用于控制生产者发送消息的持久性。具体地,可通过生产者的 acks参数进行控制。其可选值及含义如下所示

  • 0: 此时生产者根本不会等待来自Broker的任何确认。消息会立即添加到Socket Buffer中并视为已发送
  • 1: 此时意味当消息只要被写入Leader副本的本地日志后,即视为已发送。而不会等待其他所有的Follower副本确认。此种场景下,当Leader副本刚刚确认消息发送成功,但其他Follower副本还未来得及复制同步时,该Leader副本发生宕机即会造成消息丢失
  • all: 其与将acks设置为-1的意义等价。Leader副本会等待所有Follower副本的结果,只有ISR(In-Sync Replicas)中所有副本都确认完毕,才会视为发送成功。这样只要该ISR中尚有一个副本存活、未宕机,即可保证消息不会丢失

ISR最小数量

当生产者的acks参数设置为all后, 生产者发送的消息不仅需要得到ISR中全部副本的确认,还需要满足min.insync.replicas ISR最小数量参数的要求,才会视为消息发送成功。例如上文的topic_alarm_in主题配置了副本数为3,但某分区的一个副本所在Broker节点发生宕机后,则该分区的实际副本即从3变为2了,即此时该分区的ISR调整为2。此时生产者发送一条消息到该分区,显然这里生产者acks参数设置为all了。假设现在该分区全部的两个副本均确认了,如果min.insync.replicas参数配置为2,则不会有任何问题,消息发送成功;但如果min.insync.replicas参数配置为3,虽然ISR中副本全部确认了,但由于只有两个副本未达到min.insync.replicas参数所要求的三个副本。故消息发送失败,一方面Broker不会存储该消息;另一方面,生产者会收到NotEnoughReplicasException异常

配置示例

上文retries、acks参数的配置示例如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
server:
port: 8069

spring:
kafka:
# 集群地址
bootstrap-servers:
- 192.168.19.2:8001
- 192.168.19.2:8002
- 192.168.19.2:8003
# 生产者
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 重试次数
retries: 5
# 应答机制
acks: -1

消费者侧

消费者侧默认使用自动提交机制,其会在后台定时自动提交offset偏移量。具体地,可通过enable.auto.commit、auto.commit.interval.ms参数分别设置是否启用自动提交、自动提交的频率。下面即是一个SpringBoot下消费者关于自动提交机制的配置示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
server:
port: 8069

spring:
kafka:
# 集群地址
bootstrap-servers:
- 192.168.19.2:8001
- 192.168.19.2:8002
- 192.168.19.2:8003
# 消费者
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 使能自动提交
enable-auto-commit: true
# 自动提交的频率, Unit: ms
auto-commit-interval: 996

事实上自动提交机制也会导致消息丢失的问题。例如当消费者侧刚刚接收到消息后,如果后台线程恰好在此时提交了offset偏移量。在消费者侧还未来得及对此条消息进行业务处理时,消费者服务宕机了。可以看到此时Kafka认为该条消息已经被消费了,但消费者服务事实上并未完成对该条消息的业务处理。解决方案也很简单,关闭自动提交,转而使用手动提交

首先在配置文件中关闭自动提交

1
2
3
4
5
6
spring:
kafka:
# 消费者
consumer:
# 关闭自动提交
enable-auto-commit: false

然后通过Java配置类自定义ConsumerFactory、ConcurrentKafkaListenerContainerFactory实例,如下所示。其中对于ConcurrentKafkaListenerContainerFactory实例而言,将ackMode设置为MANUAL_IMMEDIATE。进一步地对于手动提交而言,其存在两种方式:同步提交、异步提交。前者是阻塞式的;而后者由于是异步提交,故不会阻塞当前服务。但如果提交失败了是不会进行重试的。这里我们将syncCommits设置为true,即使用同步提交方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfig {

@Autowired
private KafkaProperties kafkaProperties;

private Map<String, Object> consumerProperties(){
Map<String, Object> props = new HashMap<>( kafkaProperties.buildConsumerProperties() );
return props;
}

@Bean
public DefaultKafkaConsumerFactory<String, String> kafkaConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
return new DefaultKafkaConsumerFactory<>( consumerProperties() );
}

@Bean("manualListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory( consumerFactory );
// 将ackMode设置为MANUAL_IMMEDIATE
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// true: 同步提交; false: 异步提交
factory.getContainerProperties(
).setSyncCommits(true);
return factory;
}
}

然后在消费者处理方法上添加@KafkaListener注解,通过containerFactory属性设置ConcurrentKafkaListenerContainerFactory实例的名称。并在业务处理完消息后通过Acknowledgment实例的acknowledge方法实现提交offset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
public class MyConsumer {

@KafkaListener(topics=TOPIC_ALARM_IN, groupId="myGroup1", containerFactory = "manualListenerContainerFactory")
public void handle(ConsumerRecord<String, String> record, Acknowledgment ack) {
// 反序列化以进行业务处理
AlarmIn alarmIn = JSON.parseObject(record.value(), AlarmIn.class);
int index = record.partition();
long offset = record.offset();
System.out.println("[myGroup1] <c1>: alarmIn: " + alarmIn + ", partition: " + index +", offset: " + offset);

// 业务处理完毕再手动提交offset
ack.acknowledge();
}
}

参考文献

  1. Kafka权威指南 Neha Narkhede、Gwen Shapira、Todd Palino著
0%