Skip to content

1. 核心概念

生产者 交换机 队列 消费者

几种模式 img

核心部分 img

Broker: 接收和分发消息的应用,RabbitMQ Server就是Message Broker

Virtual host: 出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时,可以划分出多个host,每个用户在自己的host创建exchange/queue等

Connection: publisher/consumer和broker之间的TCP连接

Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel进行通讯,AMQP method包含了channel id帮助客户端和message broker识别channel,,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销

Exchange:message 到达broker的第一站,根据分发规则,匹配查询表中的routing key,分发消息到queue中去。常用的类型有:direct(point-to-point),topic(publish-subscribe)and fanout (multicast)

Queue: 消息最终被送到这里等待consumer取走 Binding: exchange和queue之间的虚拟连接,binding中可以包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据

2.安装

官方文档进行安装

2.1 添加账户并开启

web界面

添加账户

1.创建账号

rabbitmqctl add_user admin 123456

2.设置用户角色

rabbitmqctl set_user_tags admin administrator

3.设置用户权限

shell
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p / admin ".*"   ".*"  ".*"

4.开启web管理界面插件

rabbitmq-plugins enable rabbitmq_management

开启后在端口15672进行访问

3. 消息队列模式

3.1 hello

3.2 工作模式 (work queue)

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

注意:若先启动生产者,再启动消费者,那么第一个启动的消费者将接受所有消息。 所以一般是先启动消费者,在启动生产者。 img

3.3 消息应答

3.3.1 概念

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会发生什么情况。RabbitMQ一旦向消费者传递了一条消息,便立即将该消 息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续 发送给该消费这的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,bbitmq引入消息应答机制,消息应答就是:消费者在接 收到消息并且处理该消息之后,告诉rabbitmq它己经处理了,rabbitmq可以把该消息删除了。

3.3.2 自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权 衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者channel关闭,那么消息就丢 失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制, 当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终 使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并 以某种速率能够处理这些消息的情况下使用。

3.3.3 手动应答

Channel..basicAck(用于肯定确认) RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃了

Channel.basicNack(用于否定确认)

Channel.basicReject(用于否定确认)

与Channel..basicNack相比少一个参数,批量应答     不处理该消息了直接拒绝,可以将其丢弃了

3.3.4 Multiple解释(批量应答)

手动应答的好处是可以批量应答并减少网络拥堵

channel.basicAck(deliveryTag, true);

true: 代表 multiple 批量应答。

批量应答:会应答信道中的所有消息。

信道:队列向消费者发送的通道

3.3.5 消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失),导致消息 未发送ACK确认,RabbitMQ将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者 可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确 保不会丢失任何消息。

3.4 RabbitMQ 持久化

3.4.1 概念

刚刚我们已经看到了如何处理任务不丢失的情况,但是如何保障当RabbitMQ服务停掉以后消 息生产者发送过来的消息不丢失。默认情沉下RabbitMQ退出或由于某种原因崩溃时,它忽视队列 和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标 记为持久化。

3.4.2 队列持久化

boolean durable=true;

channel.queueDecalre(ACK_QUEUE_NAME,durable, exclusive: false, autoDelete:false, arguments: null)

注意:队列持久化只是持久化队列,里面的消息不回被持久化。

3.4.3 消息持久化

要想让消息实现持久化需要在消息生产者修改代码,MessageProperties..PERSISTENT_TEXT_PLAIN添 加这个属性。

img

将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉RabbitMQ将消息保存到磁盘,但是 这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没 有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要 更强有力的持久化策略,参考后边课件发布确认章节

3.4.3 不公平分发

在最开始的时候我们学习到RāobitMQ分发消息采用的轮训分发,但是在某种场景下这种策略并不是 很好,比方说有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另外一个消费者2 处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间 处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是 RabbitMQ并不知道这种情况它依然很公平的进行分发。

为了避免这种情况,我们可以设置参数channel.basicQos(1);

int prefetchCount=1;

channel.basicQos(prefetchCount);

注意:这里确认方式改为手动确认才比较明显的显示出效果,自动确认会只执行一次就停止了

3.4.4 预取值

队列预先把消息放到信道中的个数,

img

img

官方文档,如果为零,那么信道中则无限制。

4.发布确认

4.1 发布确认原理

生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的 消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broke: 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队 列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传 给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置 basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。 confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信 道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调 方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消 息,生产者应用程序同样可以在回调方法中处理该nack消息。

4.1.2 单个发布确认

这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它 被确认发布,后续的消息才能继续发布,waitForConfirms0rDie(1ong)这个方法只有在消息被确认 的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。 这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会 阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某 些应用程序来说这可能已经足够了。

img

耗时测试,发送了一百条数据

img

耗时 134ms,

没有开启的耗时,9ms

img

4.1.3 批量发布确认

上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地 提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现 问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种 方案仍然是同步的,也一样阻塞消息的发布。

img

4.1.4 异步发布确认

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说, 他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功, 下面就让我们来详细讲解异步确认是怎么实现的。

img

4.1.5 如何处理异步未确认的消息

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列, 比如说用ConcurrentLinkedQueue这个队列在confirm callbacks与发布线程之间进行消息的传递。

4.1.6 发布确认代码实例

java

package com.zhaoyubin.confirmPublish;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.zhaoyubin.util.RabbitMqUtil;

import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;

/**
 * 单个发布确认
 */

public class ConfirmProducer {
    private static final String  QUEUE_NAME_SINGLE="singleConfirm";
    private static final String  QUEUE_NAME_BATCH="batchConfirm";
    private static final String  QUEUE_NAME_ASYNC="asyncConfirm";

    public static void main(String[] args) throws Exception{

        asyncConfirm();     //异步发布确认
    }

    /**
     * 单个发布确认
     * @throws Exception
     */
    public static void singleConfirm() throws  Exception{
        Channel channel = RabbitMqUtil.getConnect();

        //开启消息确认
        channel.confirmSelect();

        channel.queueDeclare(QUEUE_NAME_SINGLE,true,false,false,null);

        long startTime = System.currentTimeMillis();

        for (int i = 0; i < 1000; i++) {
            channel.basicPublish("",QUEUE_NAME_SINGLE, MessageProperties.PERSISTENT_TEXT_PLAIN,"singleconfirm".getBytes());

            //返回确认结果,此过程是阻塞的
            boolean b = channel.waitForConfirms();
            if(b){
                System.out.println("第"+(i+1)+"条消息发送成功");
            }
        }
        long endTime = System.currentTimeMillis();

        System.out.println("====>总耗时:"+(endTime-startTime)+"ms");
    }


    /**
     * 批量发布确认, 对比于单个消息确认只是手动的加了个数量判断,
     * @throws Exception
     */
    public static void batchConfirm() throws  Exception{
        Channel channel = RabbitMqUtil.getConnect();

        //开启消息确认
        channel.confirmSelect();

        channel.queueDeclare(QUEUE_NAME_BATCH,true,false,false,null);

        long startTime = System.currentTimeMillis();

        int batchSize=100;        //批量确认消息大小
        int waitConfirmCount=0; //未确认消息数量
        for (int i = 0; i < 1000; i++) {
            channel.basicPublish("",QUEUE_NAME_BATCH, MessageProperties.PERSISTENT_TEXT_PLAIN,"batchconfirm".getBytes());

            waitConfirmCount++;
            if (waitConfirmCount==batchSize){
                //返回确认结果,此过程是阻塞的
                boolean b = channel.waitForConfirms();
                waitConfirmCount=0;
            }

        }
        long endTime = System.currentTimeMillis();

        System.out.println("====>总耗时:"+(endTime-startTime)+"ms");
    }
    /**
     * 异步发布确认,效果最好,比上面两个个
     * @throws Exception
     */
    public static void asyncConfirm() throws  Exception{
        Channel channel = RabbitMqUtil.getConnect();
        //开启消息确认
        channel.queueDeclare(QUEUE_NAME_ASYNC,true,false,false,null);
        channel.confirmSelect();

        //线程安全hashmap

        ConcurrentSkipListMap<Long,String> concurrentSkipListMap = new ConcurrentSkipListMap<>();



        // 开启消息监听,监听异步返回的结果
        // multiple true:代表批量确认
        // false:代表单个确认,
        // 经过测试,如果 发布的速度很快那么他会是批量的,如果加一个1s延迟,那么他是单个确认的
        channel.addConfirmListener((deliveryTag,multiple)->{
            if(multiple){
                ConcurrentNavigableMap<Long, String> concurrentNavigableMap = concurrentSkipListMap.headMap(deliveryTag);
                concurrentNavigableMap.clear();
                concurrentSkipListMap.remove(deliveryTag);
            }else{
                concurrentSkipListMap.remove(deliveryTag+1);
            }
            //成功的回调函数
            System.out.println("成功了:deliveryTag:"+deliveryTag+",multiple:"+multiple);

        },( deliveryTag,  multiple)->{
            //失败的回调函数
            System.out.println("失败了:deliveryTag:"+deliveryTag+",multiple:"+multiple);
        });

        long startTime = System.currentTimeMillis();
        //循环发布消息
        for (int i = 0; i < 1000; i++) {
            String message="asyncMessage"+i;

            channel.basicPublish("",QUEUE_NAME_ASYNC,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

            //将消息记录到一个容器中
            concurrentSkipListMap.put(channel.getNextPublishSeqNo()-1, message);
        }

        TimeUnit.SECONDS.sleep(10);

        //结束时间
        long endTime = System.currentTimeMillis();
//        System.out.println("====>总耗时:"+(endTime-startTime)+"ms");
    }
}

5.交换机

在上一节中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消 费者(工作进程)。在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式称为"发布/订阅”. 为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成:第一个程序将发出日志消息,第二个程序是消费者。其中我们会启动两个消费者,其中一个消费者接收到消息后把日志存储在磁盘, 另外一个消费者接收到消息后把消息打印在屏幕上,事实上第一个程序发出的日志消息将广播给所有消费者

5.1.1 Exchange 概念

RabbitMQ消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上,通常生产 者甚至都不知道这些消息传递传递到了哪些队列中。 相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来 自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消 息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

img

5.1.2 Exchanges 的类型

总共有以下类型:

直接direct),主题(topic),标题(headers),扇出(fanout)

5.1.3 无名exchange

在本教程的前面部分我们对exchange一无所知,但仍然能够将消息发送到队列。之前能实现的 原因是因为我们使用的是默认交换,我们通过空字符串∽进行标识。

img

5.2临时队列

之前的章节我们使用的是具有特定名称的队列(还记得hello和ack_queue吗?)。队列的名称我们 来说至关重要-我们需要指定我们的消费者去消费哪个队列的消息。 每当我们连接到Rabbit时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称 的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。

创建临时队列的方式:

String queueName=channel.queueDeclare().getQueue();

img

5.3 绑定

img

RouteingKey: 根据路由键 交换机可以选择把消息发给哪一个队列。

5.4 Fanout

Fanout 介绍

Fanout这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的所有队列中。系统中默认有些exchange类型

实战:

img

5.5 Rirect直接交换机

5.5.1 回顾

在上一节中,我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。在本 节我们将向其中添加一些特别的功能比方说我们只让某个消费者订阅发布的部分消息。例如我们只把 严重错误消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。 我们再次来回顾一下什么是bindings,.绑定是交换机和队列之间的桥梁关系。也可以这么理解: 队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey来表示也可称该参数为binding key, 创建绑定我们用代a码:channel..queueBind(queueName,EXCHANGE_NAME,"routingKey");绑定之后的 意义由其交换类型决定。

5.5.2 Direct Exchange 介绍

上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希 望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志 消息避免浪费磁盘空间。Fanout这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的 广播,在这里我们将使用direct这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的routingKey队列中去。

img

在上面这张图中,我们可以看到X绑定了两个队列,绑定类型是direct.队列Q1绑定键为orange, 队列Q2绑定键有两个:一个绑定键为black,另一个绑定键为green. 在这种绑定情况下,生产者发布消息到exchange上,绑定键为orange的消息会被发布到队列 Q1。绑定键为blackgreen和的消息会被发布到队列Q2,其他消息类型的消息将被丢弃。

5.5.3 多重绑定

img

当然如果exchange的绑定类型是direct,但是它绑定的多个队列的key如果都相同,在这种情 况下虽然绑定类型是direct但是它表现的就和fanout有点类似了,就跟广播差不多,如上图所示。

5.6 Topics

5.6.1 之前类型的问题

在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的faou止交换机,而是使用了direct交换机,从而有能实现有选择性地接收日志。尽管使用direct交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有info.base和info.ad小vantage,某个队列只想info.base的消息,那这个时候direct就办不到了。这个时候就只能使用topic类型

5.6.2 Topic的要求

发送到类型是topic交换机的消息的routing_key不能随意写,必须满足一定的要求,它必须是一个单 词列表,以点号分隔开。这些单词可以是任意单词,比如说:"stock.usd,yse","nyse.vmw", "quick.orange..rabbit'".这种类型的。当然这个单词列表最多不能超过255个字节。 在这个规则列表中,其中有两个替换符是大家需要注意的

#(井号) 可以替代零个或多个单词

5.6.3 Topic 匹配案例

img

6. 死信队列

6.1 死信的概念

先从慨念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理 解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息 进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。 应用场景:为了保证订单业务的消息数据不丢失,需要使用到RābbitMQ的死信队列机制,当消息 消费发生异常时,将消息投入死信队列中.还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

6.2 死信的来源

消息TTL过期队列达到最大长度(队列满了,无法再添加数据到mq中)消息被拒绝(basic.reject或basic.nack)并且requeue=false

6.3 死信实战

6.3.1 代码架构图

img

消费者代码:

java
package com.zhaoyubin.deadLetter;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;
import com.zhaoyubin.util.RabbitMqUtil;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class DeadConsumer1 {

    private final static String EXCHANGE_NAME="zhao.exchange.normal";
    private final static String EXCHANGE_NAME_DEAD="zhao.exchange.deadLetter";

    private final static String QUEUE_NAME="q_normal";
    private final static String QUEUE_NAME_DEAD="q_dead";


    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtil.getConnect();

        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(EXCHANGE_NAME_DEAD, BuiltinExchangeType.DIRECT);

        //创建普通队列多参
        Map<String,Object> queueArgs=new HashMap<>();
        queueArgs.put("x-dead-letter-exchange",EXCHANGE_NAME_DEAD);
        queueArgs.put("x-dead-letter-routing-key","deadRoutingKey");
        //设置过期时间,一般在生产方设置,这里只是说明一下有这个参数
        //queueArgs.put("x-message-ttl",100000);


        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,queueArgs);
        channel.queueDeclare(QUEUE_NAME_DEAD,false,false,false,null);

        //绑定队列
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"normalRoutingKey");
        channel.queueBind(QUEUE_NAME_DEAD,EXCHANGE_NAME_DEAD,"deadRoutingKey");

        channel.basicConsume(QUEUE_NAME,false,(consumerTag, message)->{
            //成功回调
            System.out.println("回调成功:手动拒绝");
            System.out.println(new String(message.getBody()));

            //手动拒绝消息,requeue – 如果被拒绝的消息应该重新排队而不是丢弃/死信,则为 true
            //在此,false代表进入死信
            channel.basicReject(message.

生产者代码

java
package com.zhaoyubin.deadLetter;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.zhaoyubin.util.RabbitMqUtil;

import java.io.IOException;

public class DeadProducer {
    private final static String EXCHANGE_NAME="zhao.exchange.normal";
    private final static String EXCHANGE_NAME_DEAD="zhao.exchange.deadLetter";

    private final static String QUEUE_NAME="q_normal";
    private final static String QUEUE_NAME_DEAD="q_dead";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtil.getConnect();

        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        //死信消息,设置TTL时间 单位ms
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties()
                .builder().expiration("10000").build();

        channel.basicPublish(EXCHANGE_NAME,"normalRoutingKey",basicProperties,"私信消息:".getBytes());
    }
}

7 延迟队列

7.1 延迟队列的概念

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

7.2 延迟队列使用场景

1.订单在十分钟之内未支付则自动取消

2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。

3.用户注册成功后,如果三天内没有登陆则进行短信提醒。

4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。

5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也,是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至干万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

7.6 延时队列优化

7.6.1 代码架构图

img

需要优化的问题

如果再来很多需求如 1分钟延迟,10分钟延迟,1小时延迟。那么会增加无数个延迟队列。

现在需要解决的是,在一个队列中实现多种延迟。

产生问题

shell
2022-09-05 13:20:10.672  INFO 25140 --- [nio-8080-exec-7] com.zhaoyubin.controller.Test            : /test/public/发送的消息20s公共消息队列
2022-09-05 13:20:11.681  INFO 25140 --- [nio-8080-exec-8] com.zhaoyubin.controller.Test            : /test/public/发送的消息10s公共消息队列
2022-09-05 13:20:30.678  INFO 25140 --- [ntContainer#0-1] com.zhaoyubin.consumer.TestConsumer      : 消费者接受的消息20s公共消息队列
2022-09-05 13:20:30.679  INFO 25140 --- [ntContainer#0-1] com.zhaoyubin.consumer.TestConsumer      : 消费者接受的消息10s公共消息队列

如果两条延迟消息同时发送,一个10s,一个20s,

但是如果20s 在 10 s前面发送,那么会先执行20s延迟的消息。

为了解决此问题,引入了插件

7.7 Rabbitmq 插件实现延迟队列

上文中提到的问题,确实是一个问题,如果不能实现在消息粒度上的TL,并使其在设置的TL时间及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题。

7.7.1 安装延时队列插件

在官网上下载https:/www.rabbitmq.com/community-plugins.html,下载 rabbitmq_delayed_message_exchange插件,然后解压放置到RabbitMQ的插件目录。 进入RabbitMQ的安装目录下的plgins目录,执行下面命令让该插件生效,然后重启RabbitMQ

1.下载插件位置

Community Plugins — RabbitMQ

2.查看插件目录

shell
zhaoyubin@ubuntu:~$ sudo rabbitmq-plugins directories
[sudo] password for zhaoyubin: 
Listing plugin directories used by node rabbit@ubuntu
Plugin archives directory: /usr/lib/rabbitmq/plugins:/usr/lib/rabbitmq/lib/rabbitmq_server-3.10.7/plugins
Plugin expansion directory: /var/lib/rabbitmq/mnesia/rabbit@ubuntu-plugins-expand
Enabled plugins file: /etc/rabbitmq/enabled_plugins

3.查看插件命令

shell
zhaoyubin@ubuntu:/usr/lib/rabbitmq/plugins$ sudo rabbitmq-plugins list
Listing plugins with pattern ".*" ...
 Configured: E = explicitly enabled; e = implicitly enabled
 | Status: * = running on rabbit@ubuntu
 |/
[  ] rabbitmq_amqp1_0                  3.10.7
[  ] rabbitmq_auth_backend_cache       3.10.7
[  ] rabbitmq_auth_backend_http        3.10.7
[  ] rabbitmq_auth_backend_ldap        3.10.7
[  ] rabbitmq_auth_backend_oauth2      3.10.7
[  ] rabbitmq_auth_mechanism_ssl       3.10.7
[  ] rabbitmq_consistent_hash_exchange 3.10.7
[  ] rabbitmq_delayed_message_exchange 3.10.2
[  ] rabbitmq_event_exchange           3.10.7
[  ] rabbitmq_federation               3.10.7
[  ] rabbitmq_federation_management    3.10.7
[  ] rabbitmq_jms_topic_exchange       3.10.7
[E*] rabbitmq_management               3.10.7
[e*] rabbitmq_management_agent         3.10.7
[  ] rabbitmq_mqtt                     3.10.7
[  ] rabbitmq_peer_discovery_aws       3.10.7
[  ] rabbitmq_peer_discovery_common    3.10.7
[  ] rabbitmq_peer_discovery_consul    3.10.7
[  ] rabbitmq_peer_discovery_etcd      3.10.7
[  ] rabbitmq_peer_discovery_k8s       3.10.7
[  ] rabbitmq_prometheus               3.10.7
[  ] rabbitmq_random_exchange          3.10.7
[  ] rabbitmq_recent_history_exchange  3.10.7
[  ] rabbitmq_sharding                 3.10.7
[  ] rabbitmq_shovel                   3.10.7
[  ] rabbitmq_shovel_management        3.10.7
[  ] rabbitmq_stomp                    3.10.7
[  ] rabbitmq_stream                   3.10.7
[  ] rabbitmq_stream_management        3.10.7
[  ] rabbitmq_top                      3.10.7
[  ] rabbitmq_tracing                  3.10.7
[  ] rabbitmq_trust_store              3.10.7
[e*] rabbitmq_web_dispatch             3.10.7
[  ] rabbitmq_web_mqtt                 3.10.7
[  ] rabbitmq_web_mqtt_examples        3.10.7
[  ] rabbitmq_web_stomp                3.10.7
[  ] rabbitmq_web_stomp_examples       3.10.7

4.开启插件

sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange

注意:这里需要把后缀去掉。不然找不到

5.重启服务

sudo service rabbitmq-server restart

7.7.2 基于插件的架构

这里消息先是在exchange中等待的,超时了放到队列中

img

7.7.3 配置文件类代码

java
 //基于插件的交换机
    @Bean
    public CustomExchange delayPluginExchange(){
        Map<String,Object> args= new HashMap<>();
        args.put("x-delayed-type","direct");
        return  new CustomExchange(RabbitQueueNameEnum.QUEUE_PLUGIN.getExchangeName(),"x-delayed-message",true,false,args);
    }
  //声明一个基于插件的队列
    @Bean
    public Queue pluginQueue(){
        return QueueBuilder.durable(RabbitQueueNameEnum.QUEUE_PLUGIN.getQueueName()).build();
    }
        //绑定插件队列
    @Bean
    Binding bindingPluginQueue(@Qualifier("delayPluginExchange") CustomExchange  delayPluginExchange, @Qualifier("pluginQueue") Queue pluginQueue){

        return BindingBuilder
                .bind(pluginQueue)
                .to(delayPluginExchange)
                .with(RabbitQueueNameEnum.QUEUE_PLUGIN.getRoutingKey()).noargs();  //这里用的是noargs()

    }

生产者

java
   @GetMapping("/test/plugin/{time}/{message}")
    public String pluginPublic(@PathVariable Integer time,@PathVariable String message){
        log.info("/test/public/发送的消息{}",message);
        rabbitTemplate.convertAndSend(RabbitQueueNameEnum.QUEUE_PLUGIN.getExchangeName()
                ,RabbitQueueNameEnum.QUEUE_PLUGIN.getRoutingKey()
                ,message,msg->{msg.getMessageProperties().setDelay(time);       //这里用的是setDelay而不是setExpiration
                    return msg;
                });

        return message;
    }

8. 发布确认高级

在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行RabbitMQ的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ集群不可用的时候,无法投递的消息该如何处理呢:

9.RabbitMq 其他知识点

9.1 幂等性

9.1.1 概念

用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。 举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常, 此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱 了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误 立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等

9.1.2 消息重复消费

消费者在消费MQ中的消息时,MQ已把消息发送给消费者,消费者在给MQ返回ck时网络中断, 故MQ未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但 实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

9.1.3 解决思路

MQ消费者的幂等性的解决一般使用全局ID或者写个唯一标识比如时间戳或者UUID或者订单消费 者消费MQ中的消息也可利用MQ的该id来判断,或者可按自己的规则生成一个全局唯一id,每次消费消 息时用该id先判断该消息是否已消费过。

9.1.4 消费端的幂等性保障

在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幕等性, 这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幕等性有两种操作:a. 唯一ID+指纹码机制,利用数据库主键去重,b.利用redis的原子性去实现

9.1.5 唯一id+指纹码机制

指纹码:我们的一些规侧或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个id是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。

9.1.6 Redis原子性

利用redis.执行setnx命令,天然具有幂等性。从而实现不重复消费

9.2 优先级队列

9.2.1 使用场景

在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tma商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用rdis来存放的定时轮询,大家都知道rdis只能用List做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用RabbitMQ进行改造和优化,如果发现是大客户的订单给一个相对北比较高的优先级,否则就是默认优先级。

9.2.2 架构图

img

9.2.3 如何添加

img

img

img

注意:队列设置优先级,消息也必须设置优先级才行,

先启动生产者,这样才能等到发送完后在会显示出优先级效果。

9.3 惰性队列

9.3.1 使用场景

RabbitMQ从3.6.0版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消 费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持 更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致 使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。 默认情况下,当生产者将消息发送到RabbitMQ的时候,队列中的消息会尽可能的存储在内存之中, 这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留 一份备份。当RabbitMQ需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的 时间,也会阻塞队列的操作,进而无法接收新的消息。虽然RabbitMQ的开发者们一直在升级相关的算法, 但是效果始终不太理想,尤其是在消息量特别大的时候。

img

9.3.2 两种模式

队列具备两种模式:default和lazy。默认的为default模式,在3.6.0之前的版本无需做任何变更。lazy 模式即为惰性队列的模式,可以通过调用channel..queueDeclare方法的时候在参数中设置,也可以通过 Policy的方式设置,如果一个队列同时使用这两种方式设置的话,那么Policy的方式具备更高的优先级。 如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。 在队列声明的时候可以通过"x-queue-mode"参数来设置队列的模式,取值为"default'和Iazy'。下面示 例中演示了一个惰性队列的声明细节: Map<String,Object>args new HashMap<String,Object>(); args.put("x-queue-mode","lazy"); channel.queueDeclare("myqueue",false,false,false,args);

9.3.3 内存开销对比

img

10 RabbitMQ集群

最后更新于: