RabbitMQ简介

RabbitMQ是一款开源的,Erlang编写的,基于AMQP协议的消息中间件。RabbitMQ基本概念包括。

  • Broker: 简单来说就是消息队列服务器实体
  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列
  • Queue:消息队列载体,每个消息都会被投入到一个或多个队列
  • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来
  • Routing Key: 路由关键字,exchange根据这个关键字进行消息投递
  • VHost: vhost 可以理解为虚拟 broker ,即 mini-RabbitMQ server。其内部均含有独立的 queue、exchange 和 binding 等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost 范围的用户控制。当然,从 RabbitMQ 的全局角度,vhost 可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost 中)。
  • Producer: 消息生产者,就是投递消息的程序
  • Consumer: 消息消费者,就是接受消息的程序
  • Channel: 消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

image-20210813214711614

AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了 Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。

image-20210813214913753

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:directfanouttopicheaders 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型:

  • direct

image-20210813215158785

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。

  • fanout

image-20210813215321613

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

  • topic

image-20210813215643038

topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配不多不少一个单词。

RabbitMQ安装

Linux

1
sudo apt-get install rabbitmq-server

启用管理控制台

1
sudo rabbitmq-plugins enable rabbitmq_management

默认用户名、密码是 guest / guest。默认情况下,guest用户是无法通过远程访问的,只能通过localhost访问。

可以通过下面的命令添加一个用户

1
2
3
4
5
6
7
8
#第一步:添加 admin 用户并设置密码
sudo rabbitmqctl add_user admin 123456

#第二步:添加 admin 用户为administrator角色
sudo rabbitmqctl set_user_tags admin administrator

#第三步:设置 admin 用户的权限,指定允许访问的vhost以及write/read
sudo rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"	

0813

RabbitMQ登录后的界面,如下图所示。

image-20210813173419046

RabbitMQ基本操作

RabbitMQ可以通过web管理控制台进行exchange,队列,绑定等基本的操作。

创建exchange

在控制台Exchanges tab页下方的Add a new exchange 来创建 exchange。

image-20210813220405054

  • name:交换器名称
  • Type:类型,即前面所说的direct、fanout、topic、headers
  • Durability:是否持久化
  • Auto delete:是否自动删除,如果设置为Yes,则该exchange将在满足下面条件的情况下自动删除:至少有一个队列或exhange绑定到了该exchange,并且之后所有的队列和exchange都解除了绑定。
  • Internal:如果设置为Yes,客户端将无法直接使用这个exchange。只能在exchange之间进行使用。
  • Arguments:exchange相关的参数

创建队列

08132

创建队列方式跟创建exchange类似,其中几个参数的含义:

image-20210813224046736

Name:队列名称

Durability:是否持久化

Auto delete:是否自动删除,其含义跟exchange的含义类似。

Arguments:参数,可以指定队列的一些参数。下面Add后面是一些预设的参数,点击后面的问号会提示参数的含义。

绑定

点击一个exchange,在页面中 Add binging from this exchange 部分进行绑定,可以将exchange与队列绑定,也可以将exchange与exchange绑定。

08133

上面的这些操作都可以使用客户端的接口来进行,RabbitMQ提供可丰富的客户端接口,包括Java,Python,Rest,PHP等等。可以通过这个页面查看。

Clients Libraries and Developer Tools — RabbitMQ

RabbitMQ的Java客户端

连接RabbitMQ

1
2
3
4
5
6
7
8
9
ConnectionFactory factory = new ConnectionFactory();

factory.setUsername(USER);
factory.setPassword(PASS);
factory.setHost(HOST);
factory.setVirtualHost(VHOST);
factory.setPort(PORT);

Connection conn = factory.newConnection();

也可以使用URI的方式来连接:

1
2
3
ConnectionFactory factory = new ConnectionFactory(); 
factory.setUri("amqp://userName:password@ipAddress:portNumber/virtualHost"); 
Connection conn = factory.newConnection();

创建好Connection之后,就可以创建Channel了:

1
Channel channel - conn.createChannel();

Connection 可以用来创建多个 Channel 实例,但是 Channel 实例不能在线程间共享,应用程序应该为每一个线程开辟一个 Channel。

使用交换器和队列

交换器和队列是 AMQP 中 high-level 层面的构建模块,应用程序需确保在使用它们的时候就已经存在了,在使用之前需要先声明(declare)它们。

下面的代码展示了怎样声明交换器和队列,并绑定他们:

1
2
3
4
5
String exchangeName = "test.q2";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);

String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, "test2");

exchangeDeclare方法:

exchangeDeclare 有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成的。

1
2
3
4
Exchange.DeclareOk exchangeDeclare(String exchange, 
                   String type, boolean durable, 
                   boolean autoDelete, boolean internal, 
                   Map<String, Object> arguments) throws IOException;

这个方法的返回值是 Exchange.DeclareOK,用来标识成功声明了一个交换器。各个参数详细说明如下所述。

  • exchange:交换器的名称。
  • type:交换器的类型,常见的如 fanout、direct、topic,详情参见 2.1.4 节。
  • durable:设置是否持久化。durable 设置为 true 表示持久化,反之是非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
  • autoDelete:设置是否自动删除。autoDelete 设置为 true 则表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。注意不能错误地把这个参数理解为:“当与此交换器连接的客户端都断开时,RabbitMQ 会自动删除本交换器”。
  • internal:设置是否是内置的。如果设置为 true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
  • argument:其他一些结构化参数,比如 alternate-exchange。

这里的操作跟使用控制台进行操作的效果是一致的。同样的道理,队列的声明可以使用queueDeclare方法来操作。

queueDeclare方法:

1
2
3
4
5
Queue.DeclareOk queueDeclare(String queue, 
                             boolean durable, 
                             boolean exclusive, 
                             boolean autoDelete,
                             Map<String, Object> arguments) throws IOException;

不带任何参数的 queueDeclare 方法默认创建一个由 RabbitMQ 命名的(类似这种amq.gen-LhQz1gv3GhDOv8PIDabOXA 名称,这种队列也称之为匿名队列)、排他的、自动删除的、非持久化的队列。其他参数的含义:

  • queue:队列的名称。
  • durable:设置是否持久化。为 true 则设置队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。
  • exclusive:设置是否排他。为 true 则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:排他队列是基于连接(Connection)可见的,同一个连接的不同信道(Channel)是可以同时访问同一连接创建的排他队列;“首次”是指如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同;即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
  • autoDelete:设置是否自动删除。为 true 则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。不能把这个参数错误地理解为:“当连接到此队列的所有客户端断开时,这个队列自动删除”,因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列
  • arguments:设置队列的其他一些参数,如 x-message-ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-deadletter-routing-key、x-max-priority 等。

queueBind方法:

queueBind方法将队列和交换器绑定到一起:

1
2
3
4
 Queue.BindOk queueBind(String queue, 
                        String exchange, 
                        String routingKey, 
                        Map<String, Object> arguments) throws IOException;
  • queue:队列名称;
  • exchange:交换器的名称;
  • routingKey:用来绑定队列和交换器的路由键;
  • argument:定义绑定的一些参数。

我们也可以使用exchangeBind方法将交换器与交换器进行绑定,其使用方法与队列绑定的方法基本一致。

消息发送

一个最简单的消息发送的代码样例如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
try {
	Connection conn = BaseMq.getConnection();
    Channel channel = conn.createChannel();

    String message = "hello";
    channel.basicPublish("test", "test", null, message.getBytes());

    channel.close();
    conn.close();
} catch (Exception ex) {
	ex.printStackTrace();
}

所有发送消息的方法,最终都是调用的下面这个方法:

1
2
3
4
5
6
void basicPublish(String exchange, 
                  String routingKey, 
                  boolean mandatory, 
                  boolean immediate, 
                  BasicProperties props, 
                  byte[] body) throws IOException;
  • exchange:交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会被发送到 RabbitMQ 默认的交换器中。
  • routingKey:路由键,交换器根据路由键将消息存储到相应的队列之中。
  • props:消息的基本属性集,其包含 14 个属性成员,分别有 contentType、contentEncoding、headers(Map<String,Object>)、deliveryMode、priority、correlationId、replyTo、expiration、messageId、timestamp、type、userId、appId、clusterId。其中常用的几种都在上面的示例中进行了演示。
  • byte[] body:消息体(payload),真正需要发送的消息。
  • mandatory 和 immediate :设置mandatory和immediate标记。

消息消费

RabbitMQ 的消费模式分两种:推(Push)模式和拉(Pull)模式。推模式采用 Basic.Consume进行消费,而拉模式则是调用 Basic.Get 进行消费。

推(Push)模式:

在推模式中,可以通过持续订阅的方式来消费消息,使用到的相关类有:

1
2
com.rabbitmq.client.Consumer;
com.rabbitmq.client.DefaultConsumer; 

接收消息一般通过实现 Consumer 接口或者继承 DefaultConsumer 类来实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
try {
    Connection conn = BaseMq.getConnection();
    Channel channel = conn.createChannel();

    channel.basicConsume("test.fanout.q2", false, new DefaultConsumer(channel) {

    	@Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body) throws IOException {
             // no work to do
             System.out.println(consumerTag);
             System.out.println("MESSAGE: " + new String(body));

             long deliveryTag = envelope.getDeliveryTag();

             channel.basicAck(deliveryTag, false);
         }
     });
} catch (Exception ex) {
     ex.printStackTrace();
}

basicConsume参数最全的方法如下:

1
2
3
4
5
6
 String basicConsume(String queue,
                     boolean autoAck, 
                     String consumerTag, 
                     boolean noLocal, 
                     boolean exclusive, 
                     Map<String, Object> arguments, Consumer callback) throws IOException;

其对应的参数说明如下所述。

  • queue:队列的名称;
  • autoAck:设置是否自动确认。建议设成 false,即不自动确认;
  • consumerTag:消费者标签,用来区分多个消费者;
  • noLocal:设置为 true 则表示不能将同一个 Connection 中生产者发送的消息传送给这个 Connection 中的消费者;
  • exclusive:设置是否排他;
  • arguments:设置消费者的其他参数;
  • callback:设置消费者的回调函数。用来处理 RabbitMQ 推送过来的消息,比如DefaultConsumer,使用时需要客户端重写(override)其中的方法。

拉(Pull)模式:

通过 channel.basicGet 方法可以单条地获取消息,其返回值是 GetRespone。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
try {
    Connection conn = BaseMq.getConnection();
    Channel channel = conn.createChannel();

    GetResponse resp = channel.basicGet("testq", false);
    String message = new String(resp.getBody());

    System.out.println("message: " + message);

    // 如果设置为非自动确认,需要使用basicAck进行确认。
    channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);

}catch (Exception ex){
    ex.printStackTrace();
}

其中 queue 代表队列的名称,如果设置 autoAck 为 false,那么同样需要调用channel.basicAck 来确认消息已被成功接收。

消息确认

为了保证消息从队列可靠地达到消费者,RabbitMQ 提供了消息确认机制(message acknowledgement)。消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 等于 false时,RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当 autoAck 等于 true 时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

RabbitMQ高级特性

mandatory和immediate

mandantory

当 mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当 mandatory 参数设置为 false 时,出现上述情形,则消息直接被丢弃。

那么生产者如何获取到没有被正确路由到合适队列的消息呢?这时候可以通过调用channel.addReturnListener 来添加 ReturnListener 监听器实现。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
try {
    Connection conn = BaseMq.getConnection();
    Channel channel = conn.createChannel();

    channel.basicPublish("test", "unkonwnKey", true,
        MessageProperties.PERSISTENT_TEXT_PLAIN,
        "mandatory test".getBytes());

     channel.addReturnListener(new ReturnListener() {
         @Override
         public void handleReturn(int replyCode, String replyText, String exchange,
                                  String routingKey, AMQP.BasicProperties properties,
                                  byte[] body) throws IOException {

             String message = new String(body);
             System.out.println("returned message: " + message);

             System.out.println("replyCode: " + replyCode);
             System.out.println("replyText: " + replyText);
         }
     });
} catch (Exception ex) {
    ex.printStackTrace();
}

immediate

当 immediate 参数设为 true 时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息将不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic.Return 返回至生产者。

概括来说,mandatory 参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回给生产者。immediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立刻投递;如果所有匹配的队列上都没有消费者,则直接将消息返还给生产者,不用将消息存入队列而等待消费者了。

RabbitMQ 3.0 版本开始去掉了对 immediate 参数的支持,对此 RabbitMQ 官方解释是:

immediate 参数会影响镜像队列的性能,增加了代码复杂性,建议采用 TTL 和 DLX 的方法替

过期时间(TTL)

TTL,Time to Live 的简称,即过期时间。RabbitMQ 可以对消息队列设置 TTL。

目前有两种方法可以设置消息的 TTL。

  • 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。

  • 第二种方法是对消息本身进行单独设置,每条消息的 TTL 可以不同。

如果两种方法一起使用,则消息的 TTL 以两者之间较小的那个数值为准。消息在队列中的生存时间一旦超过设置的 TTL 值时,就会变成“死信”(Dead Message)。

通过队列属性设置消息 TTL 的方法是在 channel.queueDeclare 方法中加入x-message-ttl 参数实现的,这个参数的单位是毫秒。也可以在管理控制台设置队列的过期时间。

1
2
3
Map<String, Object> argss = new HashMap<String, Object>(); 
argss.put("x-message-ttl",6000); 
channel.queueDeclare(queueName, durable, exclusive, autoDelete, argss);

如果不设置 TTL,则表示此消息不会过期;如果将 TTL 设置为 0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃。

针对每条消息设置 TTL 的方法是在 channel.basicPublish 方法中加入 expiration的属性参数,单位为毫秒。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
Connection conn = BaseMq.getConnection();
Channel channel = conn.createChannel();

String message = "ttl test.";

AMQP.BasicProperties prop = new AMQP.BasicProperties.Builder()
                    .deliveryMode(2)        // 持久化
                    .expiration("30000")    // ttl = 30秒,30000毫秒
                    .build();

channel.basicPublish("test", "test", true, prop, message.getBytes());

死信队列

DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之为死信队列。

消息变成死信一般是由于以下几种情况:

  • 消息被拒绝(Basic.Reject/Basic.Nack),并且设置 requeue 参数为 false;
  • 消息过期;
  • 队列达到最大长度

DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ 就会自动地将这个消息重新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。

通过在 channel.queueDeclare 方法中设置 x-dead-letter-exchange 参数来为这个队列添加 DLX。

1
2
3
4
5
6
 channel.exchangeDeclare("exchange.dlx", "direct");

 Map<String, Object> params = new HashMap<>();
 params.put("x-dead-letter-exchange", "exchange.dlx");

 channel.queueDeclare("test.q3", false, false, false, params);

下面是一个例子,一个正常的消息队列,超时时间设置为10秒,如果超时,则会转发到死信队列中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Connection conn = BaseMq.getConnection();
Channel channel = conn.createChannel();

// 正常的交换器和队列
channel.exchangeDeclare("exchange.dlx", BuiltinExchangeType.DIRECT);
channel.queueDeclare("queue.dlx", true, false, false, null);
channel.queueBind("queue.dlx", "exchange.dlx", "routingKey");

// 定义死信队列
channel.exchangeDeclare("exchange.t1", BuiltinExchangeType.FANOUT, true);

Map<String, Object> params = new HashMap<>();
params.put("x-message-ttl", 15000);
params.put("x-dead-letter-exchange", "exchange.dlx");
params.put("x-dead-letter-routing-key", "routingKey");
channel.queueDeclare("queue.t1", false, false, false, params);
channel.queueBind("queue.t1", "exchange.t1", "");

// 测试消息
channel.basicPublish("exchange.t1", "", PERSISTENT_TEXT_PLAIN, "test".getBytes());

channel.close();
conn.close();

image-20210817142157199

可以通过web控制台查看消息转到死信队列的效果(下图为gif动图,queue.t1的消息10秒后过期,会自动进入死信队列queue.dlx中)。

0817

延迟队列

延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

在 AMQP 协议中,或者 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过前面所介绍的 DLX 和 TTL 模拟出延迟队列的功能。

优先级队列

优先级队列,顾名思义,具有高优先级的队列具有高的优先权,优先级高的消息具备优先被消费的特权。可以通过设置队列的 x-max-priority 参数来实现。

image-20210817143244602

或者使用客户端代码:

1
2
3
4
Map<String, Object> params = new HashMap<>();
params.put("x-max-priority", 10);
        
channel.queueDeclare("queue.pri", true, false, false, params);

优先队列创建完成后,在web端查看,会出现“pri”的标记。

image-20210817143603063

下面是一段测试队列优先级属性的代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
	// 测试优先级,假定队列queue.pri已经与交换器exchange.pri绑定好
    public static void main(String[] args) throws Exception {
        produce();
        Thread.sleep(1000);
        consume();
    }

    private static void produce() throws Exception {
        Connection conn = BaseMq.getConnection();
        Channel channel = conn.createChannel();

        // 发布10条带优先级的消息,优先级分别为1 ~ 10
        for (int i = 0; i < 10; i++) {
            AMQP.BasicProperties prop = new AMQP.BasicProperties.Builder().priority(i).build();
            String message = "priority message, pri = " + i;
            channel.basicPublish("exchange.pri", "pri", prop, message.getBytes());
        }

        channel.close();
        conn.close();
    }

    private static void consume() throws Exception {
        Connection conn = BaseMq.getConnection();
        Channel channel = conn.createChannel();

        channel.basicConsume("queue.pri", false, new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) {
                System.out.println(new String(body));
            }
        });
    }

消息持久化

设置了队列和消息的持久化,持久化的消息在系统重启之后消息依然存在。单单只设置队列持久化,重启之后消息会丢失;单单只设置消息的持久化,重启之后队列消失,继而消息也丢失。单单设置消息持久化而不设置队列的持久化显得毫无意义。

下面是一个例子,展示了消息持久化的设置。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public static void main(String[] args) throws Exception {

        Connection connection = BaseMq.getConnection();
        Channel channel = connection.createChannel();

        // 发布两条消息,一条持久化,一条非持久化
        // TEXT_PLAIN 和 PERSISTENT_TEXT_PLAIN 分别是预定义好的属性,TEXT_PLAIN是非持久化的,PERSISTENT_TEXT_PLAIN是持久化的。
        // /** Content-type "text/plain", deliveryMode 1 (nonpersistent), priority zero */
        //    public static final BasicProperties TEXT_PLAIN =
        //        new BasicProperties("text/plain",
        //                            null,
        //                            null,
        //                            1,
        //                            0, null, null, null,
        //                            null, null, null, null,
        //                            null, null);
        //
        //    /** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */
        //    public static final BasicProperties PERSISTENT_TEXT_PLAIN =
        //        new BasicProperties("text/plain",
        //                            null,
        //                            null,
        //                            2,
        //                            0, null, null, null,
        //                            null, null, null, null,
        //                            null, null);
        channel.basicPublish("test", "test", MessageProperties.PERSISTENT_TEXT_PLAIN, 
                             "PERSISTENT_TEXT_PLAIN".getBytes());
    
        channel.basicPublish("test", "test", MessageProperties.TEXT_PLAIN,
                             "TEXT_PLAIN".getBytes());

        channel.close();
        connection.close();

        // 。。。 RabbitMQ 服务器重启
        Thread.sleep(60000);

        Connection connection1 = BaseMq.getConnection();
        Channel channel1 = connection1.createChannel();

        // RabbitMQ服务器重启后,应该只能消费到内容为“PERSISTENT_TEXT_PLAIN”的消息
        channel1.basicConsume("test", new DefaultConsumer(channel1){
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) {
                System.out.println(new String(body));
            }
        });

        channel1.close();
        connection1.close();
    }

可以将所有的消息都设置为持久化,但是这样会严重影响 RabbitMQ 的性能(随机)。写入磁盘的速度比写入内存的速度慢得不只一点点。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。在选择是否要将消息持久化时,需要在可靠性和吐吞量之间做一个权衡。

消息分发

当 RabbitMQ 队列拥有多个消费者时,队列收到的消息将以轮询(round-robin)的分发方式发送给消费者。这样可能造成处理能力强的消费者空闲,处理能力弱的消费者任务繁重。

消费者可以通过使用channel.basicQos(int prefetchCount)这个方法,来限制信道上消费者所保持的最大未确认消息数量。

消息顺序性

消息的顺序性是指消费者消费到的消息和发送者发布的消息的顺序是一致的。举个例子,不考虑消息重复的情况,如果生产者发布的消息分别为 msg1、msg2、msg3,那么消费者必然也是按照 msg1、msg2、msg3 的顺序进行消费的。

在不使用任何 RabbitMQ 的高级特性,也没有消息丢失、网络故障之类异常的情况发生,并且只有一个消费者的情况下,最好也只有一个生产者的情况下可以保证消息的顺序性。如果有多个生产者同时发送消息,无法确定消息到达 Broker 的前后顺序,也就无法验证消息的顺序性。

消息确认

  • 发送方确认模式

将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。

一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一 ID)。

如果 RabbitMQ 发生内部错误从而导致消息丢失,会发送一条nack(notacknowledged,未确认)消息。

发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。

  • 接收方确认机制

消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ 才能安全地把消息从队列中删除。

这里并没有用到超时机制,RabbitMQ 仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息。保证数据的最终一致性;

下面罗列几种特殊情况

如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)

如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息。

如何保证消息不被重复消费

先说为什么会重复消费:正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除;

但是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者。

针对以上问题,一个解决思路是:保证消息的唯一性,就算是多次传输,不要让消息的多次消费带来影响;保证消息等幂性;

比如:在写入消息队列的数据做唯一标示,消费消息时,根据唯一标识判断是否消费过;

假设你有个系统,消费一条消息就往数据库里插入一条数据,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下是否已经消费过了,若是就直接扔了,这样不就保留了一条数据,从而保证了数据的正确性。

消息丢失

消息不可靠的情况可能是消息丢失,劫持等原因;丢失又分为:生产者丢失消息消息列表丢失消息消费者丢失消息

生产者丢失消息

从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息;

transaction机制就是说:发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降;

confirm模式用的居多:一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。

消息队列丢数据

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。

这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。那么如何持久化呢?

这里顺便说一下吧,其实也很容易,就下面两步:

  • 将queue的持久化标识durable设置为true,则代表是一个持久的队列

  • 发送消息的时候将deliveryMode=2

这样设置以后,即使rabbitMQ挂了,重启后也能恢复数据

消费者丢失消息:

消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!

消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;如果这时处理消息失败,就会丢失该消息;

解决方案:处理消息成功后,手动回复确认消息

RabbitMQ集群

RabbitMQ是比较有代表性的,因为是基于主从(非分布式)做高可用性的,我们就以 RabbitMQ为例子讲解第一种 MQ 的高可用性怎么实现。RabbitMQ 有三种模式:单机模式普通集群模式镜像集群模式

单机模式

就是 Demo 级别的,一般就是你本地启动了玩玩儿的?没人生产用单机模式。

普通集群模

意思就是在多台机器上启动多个RabbitMQ 实例,每个机器启动一个。你创建的queue,只会放在一个RabbitMQ实例上,但是每个实例都同步queue 的元数据(元数据可以认为是queue的一些配置信息,通过元数据,可以找到queue 所在实例)。你消费的时候,实际上如果连接到了另外一个实例,那么那个实例会从queue所在实例上拉取数据过来。这方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个queue的读写操作。

镜像集群模式

这种模式,才是所谓的RabbitMQ的高可用模式。

跟普通集群模式不一样的是,在镜像集群模式下,你创建的queue,无论元数据还是queue里的消息都会存在于多个实例上,就是说,每个RabbitMQ节点都有这个queue的一个完整镜像,包含queue的全部数据的意思。然后每次你写消息到queue的时候,都会自动把消息同步到多个实例的queue上。RabbitMQ有很好的管理控制台,就是在后台新增一个策略,这个策略是镜像集群模式的策略,指定的时候是可以要求数据同步到所有节点的,也可以要求同步到指定数量的节点,再次创建queue的时候,应用这个策略,就会自动将数据同步到其他的节点上去了。这样的话,好处在于,你任何一个机器宕机了,没事儿,其它机器(节点)还包含了这个queue的完整数据,别的consumer都可以到其它节点上去消费数据。坏处在于,第一,这个性能开销也太大了吧,消息需要同步到所有机器上,导致网络带宽压力和消耗很重!RabbitMQ 一个queue的数据都是放在一个节点里的,镜像集群下,也是每个节点都放这个queue的完整数据。

参考

Clients Libraries and Developer Tools — RabbitMQ