Kafka发送和接收

生产者

Kafka 的 Producer 发送消息采用的是异步发送的方式。

在消息发送的过程中,涉及到了两个线程:main 线程和 Sender 线程,以及一个线程共享变量:RecordAccumulator(相当于缓冲区)。

main 线程将消息发送给 RecordAccumulator,Sender 线程不断从RecordAccumulator 中拉取消息发送到 Kafka broker。

消费者

为了专注于自己的业务逻辑,Kafka 提供了自动提交 offset 的功能。

自动提交 offset 的相关参数:

enable.auto.commit:是否开启自动提交 offset 功能

auto.commit.interval.ms:自动提交 offset 的时间间隔

手动提交

commitSync(同步提交):阻塞当前线程,一直到提交成功,并且会自动失败重试

commitAsync(异步提交):没有失败重试机制,故有可能提交失败

两种方式都会将本次 poll 的一批数据最高的偏移量提交

offset逻辑

Kafka 0.9 版本之前,offset 存储在 zookeeper

0.9 版本及之后,默认将 offset 存储在 Kafka的一个内置的 topic 中。名字是:__consumer_offsets

重平衡Rebalace

当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。

消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。

offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalace。

PS:大部分的分布式存储组件,比如:Kafka、Redis-cluster、es、Arangodb都有复杂的重平衡逻辑,因为为了保持每个节点数据均衡,有些会有坑。

监控组件

http://www.kafka-eagle.org/

kafka-manager

消息重试发送问题

对于一个有着先后顺序的消息A、B,正常情况下应该是A先发送完成后再发送B,但是在异常情况下,在A发送失败的情况下,B发送成功,而A由于重试机制在B发送完成之后重试发送成功了。这时对于本身顺序为AB的消息顺序变成了BA。

针对这种问题,严格的顺序消费还需要max.in.flight.requests.per.connection参数的支持。

该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,同时也会提升吞吐量。把它设为1就可以保证消息是按照发送的顺序写入服务器的。

此外,对于某些业务场景,设置max.in.flight.requests.per.connection=1会严重降低吞吐量,如果放弃使用这种同步重试机制,则可以考虑在消费端增加失败标记的记录,然后用定时任务轮询去重试这些失败的消息并做好监控报警。

唠嗑广场

刚工作的时候,第一个项目组使用MongoDB作为业务数据库,当时经验不足,所以也没想多少,现在工作了7年了,越来越觉得MongoDB不适合作为业务库,而是用来存格式不明确的文档数据,而且,在大部分团队不适合用MongoDB,业务变化太快,组员又比较随意,没有形成使用MongoDB的规范,最后导致schema不可知,数据乱七八糟,难以维护。