RabbitMQ的知识和使用
一 Rabbitmq简介
*rabbitmq是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,rabbitmq是使用二郎语言来编写的,并且rabbitmq是基于AMQP协议的.
1. 特点
- 与springAMQP完美的整合,API丰富
- 集群模式丰富,表达式配置,ha模式,镜像队列模式
- 保证数据不丢失的前提做到高可靠性,可用性
二 AMQP协议
1. 定义
是具有现代特征的二进制协议.是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.
2. 核心概念
- Server:又称Broker,接受客户端的连接,实现AMQP实体服务
- connection:连接,应用程序与broker的网络连接
- Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道.客户端可建立多个Channel,每个Channel代表个会任务.
- message:消息,服务器和应用程序之间传送的数据,由properties和body组成.properties可以对消息进行修饰,比如消息的优先级,延迟等高级特性;body则就是消息体内容.
- virtual host: 虚拟地址,用于进行逻辑隔离,最上层的消息路由.一个virtual host里面可以有若干个exchange和queue, 同一个virtual host里面不能有相同名称的exchange或queue
- exchange:交换机,接收消息,根据路由键转发消息到绑定的队列
- binding:exchange和queue之间的虚拟连接,binding中可以包含routing key
- Routing key: 一个路由规则,虚拟机可用它来确定如何路由一个特定消息
- queue:消息队列,保存消息并将它们转发给消费者
3. exchange属性
- Name: 交换机名称
- Type:交换机类型direct,topic,fanout,headers
- Durability:是否需要持久化,true为持久化
- Auto Delete:当最后一个绑定到exchange上的队列删除后,自动删除该exchange
- Internal:当前exchange是否用于rabbitmq内部使用,默认false
- Arguments:扩展参数,用于扩展AMQP协议自制定化使用
1. Direct Exchange
- 所有发送到Direct Exchange的消息被转发到Routekey中指定queue
- Direct模式可以使用rabbitmq自带的exchange:default exchange, 所有不需要将exchange进行任何绑定操作,消息传递时,routekey必须完全匹配才会被队列接收,否则该消息会被抛弃.
2.Topic Exchange
- 所有发送到Topic Exchange的消息被转发到所有关心routekey中指定topic的queue上
- exchange将routekey和某topic进行模糊匹配,此时队列需要绑定一个topic
- 可以使用通配符进行模糊匹配
- “#”匹配一个或多个词
- “*”匹配一个词
- 例如:”log.#”能够匹配到”log.INF.oa”
- 例如:”log.*”能够匹配到”log.conf”
3. fanout exchange
- 不处理路由键,只需要简单的将队列绑定到交换机上
- 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
- fanout交换机转发消息是最快的
4. Queue
- 消息队列,实际存储消息数据
- Durability:是否持久化,Durable:是,Transient:否
- Auto delete:如选yes, 代表当最后一个监听被移除之后,该queue会自动被删除
5. VirtualHost
- 虚拟地址,用于进行逻辑隔离,最上层的消息路由
- 一个 virtual host 里面可以有若干个exchange和queue
- 同一个virtual host里面不能有相同名称的exchange或queue
三 消息生产与消费
- ConnrctionFactory:获取连接工厂
- Connection:一个连接
- Channel:数据通信信道,可发送和接收消息
- Queue:具体的消息存储队列
- Producer&Consumer 生产和消费者
四 可靠性投递
- 消息信息落库,对消息状态进行记录
- 消息的延迟投递,做二次确认,回调检查
- 消息的延迟投递,做二次确认,回调检查
五 消息确认机制
1. Confirm
- 消息确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答.
- 生产者进行接收应答,用来确认这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障
- 只能说明投递成功,不能说明消费者接收成功
2. Return消息机制
- Return Listener用于处理一些不可路由的消息
- 如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,就需要用到Return Listener
- 关键配置项:Mandatory:如果为true,监听器接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息
六 消费端限流
- Rabbitmq提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置qos的值)未被确认前,不进行消费新的消息.
- java 方法:
void BasicQos(unit pregetchSize,ushort prefetchCount,bool global);
- prefetchSize:0 (单条消息消费的大小限制,一般设为:0不限制)
- prefetchCount:N (一般设置:1,会告诉rabbitmq不要同时给一个消费者推送多余N个消息,即一旦有N给消息还没有ack,则该consumer将block掉,直到有消息ack)
- global:true/false (是否将上面设置应用于channel,简单点说,就是上面限制是channel级别的还是consumer级别)
- 注意:prefetchSize和global这两项,rabbitmq没有实现,prefetchCount在no_ask=false的情况下生效,机在自动应答的情况下这两个值是不生效的.
七 消费端ack与重回队列
- 消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿
- 如果由于服务器宕机等严重问题,那我们就需要手工进行ack保障消费端消费端消费成功
- 消费端重回队列是为了对没有处理成功的消息,把消息重新递给broker
- 一般我们在实际应用中,都会关闭重回队列,也就是设置为false
八 TTL队列/消息
- 在消息发送时可以进行指定消息的过期时间
- 队列的过期时间,从消息进入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除
九 死信队列(DLX,Dead-Letter-Exchange)
特点
- 利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个exchange,这个exchange就是DLX
- DLX也是一个正常的exchange,和一般的exchange没有区别,他能在任何的队列上被指定,实际上就是设置某个队列的属性.
- 当这个队列中有死信时,rabbitmq就会自动的将这个消息重新发布到设置的exchange上去,进而被路由到另一个队列.
- 可以监听这个队列中消息做相应的处理
消息变成死信的情况
- 消息被拒绝(basic.reject/basic.nack)并且requeue=false
- 消息TTL过期
- 队列达到最大长度
死信队列设置:
- 进行正常声明交换机,队列,绑定,只不过我们需要在队列加上一个参数即可:arguments.put(“x-dead-letter-exchange”,”dlx.exchange”);
十 Spring Cloud Stream 整合
- barista接口:定义通道类型和通道名称,通道名称是作为配置用,通道类型则决定了app会使用者一通道进行发送消息还是从中接收消息
- @Ouput:输出注解,用于定义发送消息接口
- @Input:输入注解,用于定义消息的消费者接口
- 缺点:不能实现可靠性的投递,会存在少量消息丢失的问题
消息使用
1. 事务机制
- 基本使用
- RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback()
- txSelect用于将当前channel设置成transaction模式
- txCommit用于提交事务
- txRollback用于回滚事务
- 在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了
- 使用说明地址:
https://www.jianshu.com/p/801456df3930
1
2
3channel.txSelect();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.txCommit();
- 缺点和解决方案
- 在并发量大的情况在rabbitmq事务会非常的占用资源
- 解决方法:
- 采用类似可靠性投递的机制,进行补偿投递
- 操作的数据源要统一,也就是业务操作db1和消息投递的db2使用同一个数据源
制定扩展
- 插件搜索下载地址:
http://www.rabbitmq.com/community-plugins.html
- 延迟队列插件
- 延迟插件名称:rabbitmq_delayed_message_exchange
- 将
***.ez
放入{rabbitmq_server}/plugins
目录下 - 重命名插件名称
rabbitmq_delayed_message_exchange-0.0.1.ez
- 启动插件
1
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- ps: 关闭指定插件的命令
1
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
Github地址
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Chc-个人数据程序主页!