confluent-kafka-go 消费消息防止丢失(至少一次处理)

1.为什么消息会丢失?

消费者poll消息得过程(poll的意思是从broker拿消息,并不代表拿到就消费成功了)

  • 消费者建立了与broker之间的⻓连接,开始poll消息。
  • 默认一次poll 500条消息

自动提交offset

消费者poll到消息后默认情况下,会自动向broker的_consumer_offsets主题提交当前主题-分区消费的偏移量。

自动提交会丢消息: 因为如果消费者还没消费完poll下来的消息就自动提交了偏移量,那么此 时消费者挂了,于是下一个消费者会从已提交的offset的下一个位置开始消费消息。之前未被消费的消息就丢失掉了。

原文链接:blog.csdn.net/m0_52174905/article/...

对于一个简单的数据转换服务,“已处理”意味着,简单地说,一条消息已经进入并被转换,然后被生产回 Kafka。对于任何其他情况,我们会认为消息未处理。这很重要,因为如果您使用,但您的服务在设法生产 之前死亡,您不想提交该输入消息的偏移量 - 您需要让它再次被拾取。

这就是我们问题的根源,因为不幸的是,这不是自动提交的工作方式。Kafka 消费者不知道你对消息做了什么,而且对提交偏移量更加漠不关心。就消费者而言,只要一条消息被拉入,它就会被“处理”。

所以现在想象你的消费者已经拉入 1000 条消息并将它们缓冲到内存中。然后触发自动提交,提交这 1,000 条消息的偏移量。但是,假设您的服务现在使用了太多内存,并且在处理所有消息之前被 OOM 终止信号立即关闭。数百条这样的消息将永远不会被处理。那是数据丢失

更详细解释内容可查看: newrelic.com/blog/best-practices/k...

2.解决问题(引用包为confluent-kafka-go)

至少处理一次参考连接:github.com/confluentinc/confluent-...

不同语言,不同包解决方法可能略有差别

此包中GitHub大神解释:

with librdkafka based clients, auto.offset.commit / enable.auto.offset.store are true / true by default and as you note, this does not give at least once semantics because the commit happens in a background thread independently of the application polling the consumer for new messages. By comparison, with the java client, auto.offset.commit does give at least once semantics, because the commit happens as a side effect of the next consumer.poll.

The best way to achieve at least once semantics is to disable enable.auto.offset.store, which marks a message as eligible for commit as soon as it’s delivered to the application and use StoreOffsets to manually indicate this instead. leave auto.offset.commit as true.

对于基于 librdkafka 的客户端,默认情况下auto.offset.commit/enable.auto.offset.store是true/true并且正如您所注意到的,这不会提供至少一次语义,因为提交发生在后台线程中,独立于应用程序轮询消费者以获取新消息。相比之下,对于 java 客户端,auto.offset.commit确实提供了至少一次语义,因为提交是作为下一个 consumer.poll 的副作用发生的。

实现至少一次语义的最佳方法是禁用enable.auto.offset.store,这将在消息传递到应用程序后立即将消息标记为符合提交条件,并使用 StoreOffsets 手动指示这一点。保持auto.offset.commit真实。

3.代码示例

github.com/wedneyyuri/confluent-ka...

初始化消费者时增加配置

| “enable.auto.offset.store”: false, |
| “enable.auto.commit”: true, |

消费时调用c.StoreMessage(e);并且在本次没有提交之前,下次的offset无法提交,因此在每次批量消费时还需注意根据offset排序后提交offset,否则无法提交具体解释看示例代码注释部分

防止消息重复可以生产时加uuid 消费时判断重复不在消费即可。

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!