Kafka消息队列
发布日期:2021-09-18 01:37:05 浏览次数:3 分类:技术文章

本文共 3057 字,大约阅读时间需要 10 分钟。

一、再谈消息队列的应用场景

  1. 异步处理:例如短信通知、终端状态推送、App推送、用户注册等
  2. 数据同步:业务数据推送同步
  3. 重试补偿:记账失败重试
  4. 系统解耦:通讯上下行、终端异常监控、分布式事件中心
  5. 流量消峰:秒杀场景下的下单处理
  6. 发布订阅:HSF的服务状态变化通知、分布式事件中心
  7. 高并发缓冲:日志服务、监控上报

但是,我们对消息队列的底层技术和原理还是不了解,那么我们马上开始吧…

二、消息队列的一些基本概念和简单原理

1. Broker

Broker的概念来自与Apache ActiveMQ,通俗的讲就是MQ的服务器。

2. 消息的生产者、消费者

消息生产者Producer:发送消息到消息队列。

消息消费者Consumer:从消息队列接收消息。

消息队列全面了解,读完之后,大部分程序员收藏了...

 

3. 点对点消息队列模型

消息生产者向一个特定的队列发送消息,消息消费者从该队列中接收消息;

消息的生产者和消费者可以不同时处于运行状态。

每一个成功处理的消息都由消息消费者签收确认(Acknowledge)。如图:

消息队列全面了解,读完之后,大部分程序员收藏了...

 

4. 发布订阅消息模型-Topic

发布订阅消息模型中,支持向一个特定的主题Topic发布消息,0个或多个订阅者接收来自这个消息主题的消息。在这种模型下,发布者和订阅者彼此不知道对方。实际操作过程中,

发布订阅消息模型中,支持向一个特定的主题Topic发布消息,0个或多个订阅者接收来自这个消息主题的消息。在这种模型下,发布者和订阅者彼此不知道对方。实际操作过程中,

必须先订阅,再发送消息,而后接收订阅的消息,这个顺序必须保证。

消息队列全面了解,读完之后,大部分程序员收藏了...

 

5. 消息的顺序性保证

基于Queue消息模型,利用FIFO先进先出的特性,可以保证消息的顺序性。

6. 消息的ACK确认机制

即消息的Ackownledge确认机制,

为了保证消息不丢失,消息队列提供了消息Acknowledge机制,即ACK机制,当Consumer确认消息已经被消费处理,发送一个ACK给消息队列,此时消息队列便可以删除这个消息了。如果Consumer宕机/关闭,没有发送ACK,消息队列将认为这个消息没有被处理,会将这个消息重新发送给其他的Consumer重新消费处理。

7. 消息的持久化

消息的持久化,对于一些关键的核心业务来说是非常重要的,启用消息持久化后,消息队列宕机重启后,消息可以从持久化存储恢复,消息不丢失,可以继续消费处理。

8. 消息的同步和异步收发

同步:消息的收发支持同步收发的方式。

同时还有另一种同步方式:同步收发场景下,消息生产者和消费者双向应答模式,例如:张三写封信送到邮局中转站,然后李四从中转站获得信,然后在写一份回执信,放到中转站,然后张三去取,当然张三写信的时候就得写明回信地址

消息的接收如果以同步的方式(Pull)进行接收,如果队列中为空,此时接收将处于同步阻塞状态,会一直等待,直到消息的到达。

异步:消息的收发同样支持异步方式:异步发送消息,不需要等待消息队列的接收确认;异步接收消息,以Push的方式触发消息消费者接收消息。

9. 消息的事务支持

消息的收发处理支持事务,例如:在任务中心场景中,一次处理可能涉及多个消息的接收、处理,这处于同一个事务范围内,如果一个消息处理失败,事务回滚,消息重新回到队列中。

三、我们对消息队列的实际使用

我们使用了两种消息队列组件:

RabbitMQ:高可用、高可靠消息应用场景,例如记账失败重试、通知服务,消息不允许丢

Kafka:高性能消息应用场景,例如日志、监控,消息允许丢失。

在此之上,我们封装了消息应用中心、日志服务等核心组件和服务。那么,消息应用中心和日志都用到了消息队列什么技术? 干货来了…

1. 消息应用中心

消息应用中心(任务中心)使用了消息队列的异步处理、数据同步、重试补偿、系统解耦、流量消峰等特性。其中:

消息应用中心(任务中心),支持RabbitMQ和Kafka两种消息通道,支持在任务元数据层面设置

任务:就是一个包含了任务执行上下文的消息,同时代表了异步处理

任务发送者(ITaskSender)发送任务:消息的生产者将任务消息发送的消息队列

任务类型:消息队列名称,例如:HaKeepAcco***Queue,充电补偿记账队列

消息队列:任务的临时存储

任务中心:任务集中处理,消息消费者

任务处理完成:消息Ack确认

任务的多级重试:多个重试消息队列,HaSysTaskStore2Queue

2. 日志组件

日志组件,使用了消息队列的高并发缓冲和发布订阅特性。其中:

日志组件使用Kafka作为消息通道,因为Kafka的性能好,吞吐量大, 可以容忍偶尔的消息数据丢失

日志组件使用发布订阅的消息模型

日志组件包含日志服务SDK和日志HSF服务,二者都是消息的生产者Producer

日志类型:消息的Topic主题

日志处理器:消息的消费者、Topic的订阅、日志数据处理(Hbase\ES\其他)

3. RPC服务状态变化通知

RPC服务状态变化通知,使用了消息队列的发布订阅特性。其中:

RPC服务状态变化通知,使用了RabbitMQ消息队列技术

使用发布订阅的消息模型

Topic:RPCServiceState

RPCService.Proxy:RPC服务状态变化消息的订阅者

RPC服务注册、发布:消息的生产者,发送RPC服务状态变化消息。

四、消息队列使用的最佳实践

1. RabbitMQ的连接,底层都是Socket连接,长连接 or 短连接?

RabbitMQ每个在创建每个连接的同时,会自动创建一个监视线程来定时(默认60s)侦测连接的状态,如果连接断开,触发ConnectionShutdown事件。

用长连接,还是用短连接??

发送端:建议使用短连接,用完即释放,避免长连接带来的端口占用,因为发送端无处不在,发送操作短而急促。

接收端:建议使用长连接,时刻接收处理消息,因为消息的接收消费比较集中,接收操作久而弥坚。

2. 网络是有抖动的,连接的断开是正常的,如何应对?

发送端:发送失败重试

接收端:注册ConnectionShutdown事件同时捕获消息接收异常,重新建立连接,接收消费消息

3. RabbitMQ Exchange(Topic)模式下带来的消息队列数量激增

只是创建了一个Exchange(Topic),为什么会增加这么多Queue。

因为,每个Topic的订阅都是绑定一个Queue用作消息的消费。

消息队列全面了解,读完之后,大部分程序员收藏了...

 

4. 需求的演变,消息结构的变更,如何平滑过度?

消息是byte[]数组,我们将复杂对象消息二进制序列化。

接收到消息后,我们将二进制数组反序列化为实体类。

当我们的实体类消息体的结构发生变化后,因为受二进制序列化处理的

影响,导致无法反序列化。

解决方案:

消息体预留一些string类型的扩展字段

消息队列版本化,支持多个版本的消息体。

5. Kafka Consumer Group

同一Topic的一条消息只能被同一个Group内的一个Consumer消费

多个Consumer Group可同时消费同一条消息

消息队列全面了解,读完之后,大部分程序员收藏了...

 

6. 消息的积压

消息的积压产生的原因:消息接收消费的速率低,发送的速度>接收的速度。

消息积压后的影响:

消息大量积压后,当新的消费者连接上MQ并开始接收消息时,发送速率会大幅降低。

消息队列集群的压力增加,大量的消息要持久化存储和同步。

如何减少消息积压:快速消费消息,同时保持消息体的不要过大

转载地址:https://blog.csdn.net/yue_2018/article/details/102998943 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:保证分布式系统数据一致性的6种方案
下一篇:Linux编辑器介绍

发表评论

最新留言

第一次来,支持一个
[***.219.124.196]2024年03月02日 03时37分53秒

关于作者

    喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!

推荐文章

PHP exec xargs 不执行,Linux中的xargs命令及示例 2019-04-21
php 枚举cookie内容,php设置和获取cookie 2019-04-21
单防区扩展模块怎么用_AB罗克韦尔自动化Micro800 扩展 I/O模块型号及功能介绍 2019-04-21
java矩阵类_Java泛型——泛型矩阵类 2019-04-21
java车牌正则表达式_车牌正则表达式 2019-04-21
wordpress4.9.4 mysql_WordPress 将不再支持 PHP4 和 MySQL 4 2019-04-21
安卓是用java语言写的吗_android开发是用java语言吗? 2019-04-21
java 符号 t_java – 运算符”不能应用于’T’,’T’表示有界泛型类型 2019-04-21
用matlab写出信源熵,计算离散信源的熵matlab实现 2019-04-21
php表单yii2,Yii2创建表单(ActiveForm)方法详解 2019-04-21
php 程序授权机制,授权认证详细说明 2019-04-21
java 命令提示符,如何使用Java打开命令提示符并插入命令? 2019-04-21
IP/tzgm.php,LianjiaSpider/在售数量.ipynb at master · BerSerK/LianjiaSpider · GitHub 2019-04-21
linux移动文件的脚本,使用Linux脚本移动文件 2019-04-21
linux查看系统所有变量,Linux系统各指标命令 2019-04-21
linux打印机守护程序,linux下怎么编写守护程序呢? 2019-04-21
嵌入式linux 设置时间,time_clock控件应用,使用命令date -s 12:00:00手动设置时间后,时间就停住不走了,我在Ubuntu和嵌入式Linux平台都测试到了... 2019-04-21
linux 8086下编译,Ubuntu18.04/Linux下安装DosBox进行8086汇编 2019-04-21
linux监控windows,zabbix监控之linux及windows客户端安装配置 2019-04-21
linux中怎么卸载tree,Liunx系统命令中tree命令详解 2019-04-21