Kafka的CommitFailedException

此异常是我在公司测试环境下调试消费kafka数据存入Elasticsearch时发生的,发生此异常的原因和该异常的解决方案都非常简单,此处做下记录,以便日后查看。

1. 异常含义

异常名称:CommitFailedException

异常含义:位移 (offset) 提交失败时候抛出的异常。通常该异常被抛出时还会携带这样的一段话:

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

2. 导致异常的原因

消费者组开启了rebalance且已然分配对应分区给其他消费者。这表明poll调用间隔超过了max.poll.interval.ms的值,这通常表示poll循环中的消息处理花费了太长的时间。

花费太长时间处理的原因有二:

A 确实是程序时间复杂度太高 B kafka有积压(这个是导致我司抛出这个异常的原因)

3. 解决方案

A. 增加session.timeout.ms值 (该参数在0.10.0.0版本之后进行了解耦,推出了max.poll.interval.ms参数)

B. 减少max.poll.records值

如上的两个解决方法实际上依然是0.10.0.0或之前版本时的解决之道,因为在那些版本中尚未提供max.poll.interval.ms参数,因此session.timeout.ms既用于失败检测,也用于控制消息处理时间,同时还承担着rebalance过程的超时控制。在0.10.1.0版本时社区对该参数的含义进行了解耦,推出了max.poll.interval.ms参数。实际上,在0.10.1.0.0或之后的版本中,社区推荐用户将session.timeout.ms设置一个很小的值,比如5s,但需要把max.poll.interval.ms设置成平均的消息处理时间。比如,假设你一次poll调用返回的消息数是N,你处理每条消息的平均时间是T,那么你需要设置max.poll.interval.ms稍稍大于N * T以保证poll调用间隔不会超过该阈值。

如此来看,如上的A解决办法应该修改成:增加max.poll.interval.ms值,而非session.timeout.ms.

4. 异常抛出时机

从源代码方面说,CommitFailedException异常通常发生在手动提交位移时,即用户显式调用KafkaConsumer.commitSync()方法。从使用场景来说,有两种场景可以抛出该异常:

A 消息处理时间 > max.poll.interval.ms时: 如前所述,这是该异常最“正宗”的出现场景。复现也比较容易,用户只需写一个consumer程序,订阅topic(即使用consumer.subscribe),设置max.poll.interval.ms=N,然后在consumer.poll循环中Thread.sleep(>N),之后手动提交位移即可复现,比如:

1
2
3
4
5
6
7
8
9
props.put("max.poll.interval.ms", 5000);
consumer.subscribe(Arrays.asList("topic1", "topic2", ...));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
// 处理消息
Thread.sleep(6000L);
consumer.commitSync();
}

B standalone consumer与consumer group冲突时:这里所说的standalone consumer指的是使用KafkaConsumer.assign()而非subscribe()的消费者。当用户系统中同时出现了standalone consumer和consumer group,并且它们的group id相同时,此时standalone consumer手动提交位移时就会立刻抛出此异常。这是因为group coordinator无法识别这个具有相同group id的consumer,从而向它返回“你不是一个合法的组成员”错误。目前Kafka consumer提交位移的代码中一旦碰到这个错误会立即抛出CommitFailedException。

5. 总结

针对异常抛出时机的第二种场景,我个人认为初始的抛出异常时的英文描述中完全没有提及,这实际上是该异常表述不清晰的一个表现。因为在提交位移的源代码中broker端返回“无效组成员”后,coordinator有可能认为这是一个新的成员,需要批准它加入组。这对于正常的组rebalance流程来说并没有什么问题,但对于standalone consumer而言该逻辑就显得有点不适用了。纵然不修改这个逻辑,至少也要完善CommitFailedException的异常表述,把这种情况加到异常说明里面。这样用户就能明确知晓诱发这种异常的所有场景,而不是像现在这样:只能尝试修改max.poll.records或max.poll.interval.ms。要知道对于第二种情况,无论用户如何设置max.poll.interval.ms或max.poll.records都无法规避。

坚持原创技术分享,您的支持将鼓励我继续创作!

------本文结束 感谢您的阅读------