Kafka发送和接收
生产者
Kafka 的 Producer 发送消息采用的是异步发送的方式。
在消息发送的过程中,涉及到了两个线程:main 线程和 Sender 线程,以及一个线程共享变量:RecordAccumulator(相当于缓冲区)。
main 线程将消息发送给 RecordAccumulator,Sender 线程不断从RecordAccumulator 中拉取消息发送到 Kafka broker。
消费者
为了专注于自己的业务逻辑,Kafka 提供了自动提交 offset 的功能。
自动提交 offset 的相关参数:
enable.auto.commit:是否开启自动提交 offset 功能
auto.commit.interval.ms:自动提交 offset 的时间间隔
手动提交
commitSync(同步提交):阻塞当前线程,一直到提交成功,并且会自动失败重试
commitAsync(异步提交):没有失败重试机制,故有可能提交失败
两种方式都会将本次 poll 的一批数据最高的偏移量提交
offset逻辑
Kafka 0.9 版本之前,offset 存储在 zookeeper
0.9 版本及之后,默认将 offset 存储在 Kafka的一个内置的 topic 中。名字是:__consumer_offsets
重平衡Rebalace
当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。
消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。
offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalace。
PS:大部分的分布式存储组件,比如:Kafka、Redis-cluster、es、Arangodb都有复杂的重平衡逻辑,因为为了保持每个节点数据均衡,有些会有坑。
监控组件
kafka-manager
消息重试发送问题
对于一个有着先后顺序的消息A、B,正常情况下应该是A先发送完成后再发送B,但是在异常情况下,在A发送失败的情况下,B发送成功,而A由于重试机制在B发送完成之后重试发送成功了。这时对于本身顺序为AB的消息顺序变成了BA。
针对这种问题,严格的顺序消费还需要max.in.flight.requests.per.connection参数的支持。
该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,同时也会提升吞吐量。把它设为1就可以保证消息是按照发送的顺序写入服务器的。
此外,对于某些业务场景,设置max.in.flight.requests.per.connection=1会严重降低吞吐量,如果放弃使用这种同步重试机制,则可以考虑在消费端增加失败标记的记录,然后用定时任务轮询去重试这些失败的消息并做好监控报警。
唠嗑广场
刚工作的时候,第一个项目组使用MongoDB作为业务数据库,当时经验不足,所以也没想多少,现在工作了7年了,越来越觉得MongoDB不适合作为业务库,而是用来存格式不明确的文档数据,而且,在大部分团队不适合用MongoDB,业务变化太快,组员又比较随意,没有形成使用MongoDB的规范,最后导致schema不可知,数据乱七八糟,难以维护。