在上一节中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消费者(工作进程) 。在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式 称为 ”发布/订阅” 。
为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成:第一个程序将发出日志消息,第二个程序是消费者。其中我们会启动两个消费者,其中一个消费者接收到消息后把日志存储在磁盘,另外一个消费者接收到消息后把消息打印在屏幕上,事实上第一个程序发出的日志消息将广播给所有消费者
三、Exchanges交换机 RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列 。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
相反,生产者只能将消息发送到交换机(exchange) ,交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
RabbitMQ-00000035 Exchanges 的类型 直接(direct) :处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 abc ,则只有被标记为 abc 的消息才被转发,不会转发 abc.def,也不会转发 dog.ghi,只会转发 abc。
主题(topic) :将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号 * 匹配不多不少一个词。因此 abc.# 能够匹配到 abc.def.ghi,但是 abc.* 只会匹配到 abc.def。
标题(headers) :不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定 Queue 与 Exchange 时指定一组键值对;当消息发送到RabbitMQ 时会取到该消息的 headers 与 Exchange 绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 属性是一个键值对,可以是 Hashtable,键值对的值可以是任何类型。而 fanout,direct,topic 的路由键都需要要字符串形式的。
匹配规则 x-match 有下列两种类型:
x-match = all :表示所有的键值对都匹配才能接受到消息
x-match = any :表示只要有键值对匹配就能接受到消息
扇出(fanout) :不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout 交换机转发消息是最快的。
无名exchange:
1 在前面部分我们对 exchange 一无所知,但仍然能够将消息发送到队列。之前能实现的 原因是因为我们使用的是默认交换,我们通过空字符串(“”)进行标识。
RabbitMQ-00000036 第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话
临时队列 之前的章节我们使用的是具有特定名称的队列(还记得 hello 和 ack_queue 吗?)。队列的名称我们来说至关重要,我们需要指定我们的消费者去消费哪个队列的消息。
每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称的队列 ,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连接,队列将被自动删除。
创建临时队列的方式如下:
1 String queueName = channel.queueDeclare().getQueue();
、 绑定 bindings 什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定
Fanout exchange(扇出交换机(发布订阅模式)) 一个发送,多个接受,发布/订阅模式
Fanout 介绍 Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播 到它知道的 所有队列中。系统中默认有些 exchange 类型
Fanout 实战 在这种模式下,发送者不需要声明队列,只需要声明交换机然后在每次发送中发送到交换机中。接收者方声明队列,绑定到交换机上,然后接收队列中的消息。
为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成:第一个程序将发出日志消息,第二个程序是消费者。其中启动两个消费者,其中一个消费者接收到消息后把日志存储在磁盘
Logs 和临时队列的绑定关系如下图
注意
先启动两个消费者再启动生产者。
生产者生产消息后,如果没有对应的消费者接收,则该消息是遗弃的消息
ReceiveLogs01 将接收到的消息打印在控制台
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class ReceiveLogs01 { private static final String EXCHANGE_NAME = "logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout" ); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "" ); System.out.println("等待接收消息,把接收到的消息打印在屏幕........... " ); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String (delivery.getBody(), "UTF-8" ); System.out.println("控制台打印接收到的消息" + message); }; channel.basicConsume(queueName, true , deliverCallback, consumerTag -> {}); } }
ReceiveLogs02 把消息写出到文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class ReceiveLogs02 { private static final String EXCHANGE_NAME = "logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout" ); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "" ); System.out.println("等待接收消息,把接收到的消息写到文件........... " ); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String (delivery.getBody(), "UTF-8" ); File file = new File ("D:\\test\\rabbitmq_info.txt" ); FileUtils.writeStringToFile(file,message,"UTF-8" ); System.out.println("数据写入文件成功" ); }; channel.basicConsume(queueName, true , deliverCallback, consumerTag -> {}); } }
EmitLog 发送消息给两个消费者接收:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class EmitLog { private static final String EXCHANGE_NAME = "logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout" ); Scanner sc = new Scanner (System.in); System.out.println("请输入信息" ); while (sc.hasNext()) { String message = sc.nextLine(); channel.basicPublish(EXCHANGE_NAME, "" , null , message.getBytes("UTF-8" )); System.out.println("生产者发出消息" + message); } } }
Direct exchange(直接交换机(路由模式)) 在上一节中,我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。在本节我们将向其中添加一些特别的功能——让某个消费者订阅发布的部分消息。例如我们只把严重错误消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
我们再次来回顾一下什么是 bindings,绑定是交换机和队列之间的桥梁关系。也可以这么理解: 队列只对它绑定的交换机的消息感兴趣 。绑定用参数:routingKey 来表示也可称该参数为 binding key, 创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");
绑定之后的意义由其交换类型决定。
Direct 介绍 上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希 望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志 消息避免浪费磁盘空间。Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的 广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的 routingKey 队列中去。
RabbitMQ-00000042 在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 blackgreen 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。
多重绑定 RabbitMQ-00000043 当然如果 exchange 的绑定类型是direct,但是它绑定的多个队列的 key 如果都相同 ,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了 ,就跟广播差不多,如上图所示。
Direct 实战 关系:
RabbitMQ-00000044 交换机:
RabbitMQ-00000045 c1消费者:绑定 disk 队列,routingKey 为 error
c2消费者:绑定 console 队列,routingKey 为 info、warning
当生产者生产消息到 direct_logs
交换机里,该交换机会检测消息的 routingKey 条件,然后分配到满足条件的队列里,最后由消费者从队列消费消息。
C1消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class ReceiveLogsDirect01 { private static final String EXCHANGE_NAME = "direct_logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = "disk" ; channel.queueDeclare(queueName, false , false , false , null ); channel.queueBind(queueName, EXCHANGE_NAME, "error" ); System.out.println("等待接收消息..." ); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String (delivery.getBody(), "UTF-8" ); message = "接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message; System.out.println("error 消息已经接收:\n" + message); }; channel.basicConsume(queueName, true , deliverCallback, consumerTag -> { }); } }
C2消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class ReceiveLogsDirect02 { private static final String EXCHANGE_NAME = "direct_logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = "console" ; channel.queueDeclare(queueName, false , false , false , null ); channel.queueBind(queueName, EXCHANGE_NAME, "info" ); channel.queueBind(queueName, EXCHANGE_NAME, "warning" ); System.out.println("等待接收消息..." ); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String (delivery.getBody(), "UTF-8" ); message = "接收绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message; System.out.println("info和warning 消息已经接收:\n" + message); }; channel.basicConsume(queueName, true , deliverCallback, consumerTag -> { }); } }
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Map<String, String> bindingKeyMap = new HashMap <>(); bindingKeyMap.put("info" , "普通 info 信息" ); bindingKeyMap.put("warning" , "警告 warning 信息" ); bindingKeyMap.put("error" , "错误 error 信息" ); bindingKeyMap.put("debug" , "调试 debug 信息" ); for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) { String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME, bindingKey, null , message.getBytes("UTF-8" )); System.out.println("生产者发出消息:" + message); } } }
Topics exchange(主题交换机(主题模式)) Topic 的介绍 在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的 fanout 交换机,而是使用了 direct 交换机,从而有能实现有选择性地接收日志。
尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性——比方说我们想接收的日志类型有 info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候direct 就办不到了。这个时候就只能使用 topic 类型
Topic 的要求
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表 ,以点号分隔开 。这些单词可以是任意单词
比如说:"stock.usd.nyse", "nyse.vmw", "qui ck.orange.rabbit" 这种类型的。
当然这个单词列表最多不能超过 255 个字节。
在这个规则列表中, 其中有两个替换符是大家需要注意的:
*(星号)可以代替一个位置 #(井号)可以替代零个或多个位置 Topic 匹配案例 下图绑定关系如下
RabbitMQ-00000046 Q1-->绑定的是中间带 orange 带 3 个单词的字符串 (*.orange.*)
Q2-->绑定的是最后一个单词是 rabbit 的 3 个单词 (*.*.rabbit)
第一个单词是 lazy 的多个单词 (lazy.#)
上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的
quick.orange.rabbit 被队列 Q1Q2 接收到 azy.orange.elephant 被队列 Q1Q2 接收到 quick.orange.fox 被队列 Q1 接收到 lazy.brown.fox 被队列 Q2 接收到 lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次 quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃 quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃 lazy.orange.male.rabbit 是四个单词但匹配 Q2
注意:
当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了 如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了 Topic 实战
代码如下:
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); Map<String, String> bindingKeyMap = new HashMap <>(); bindingKeyMap.put("quick.orange.rabbit" , "被队列 Q1Q2 接收到" ); bindingKeyMap.put("lazy.orange.elephant" , "被队列 Q1Q2 接收到" ); bindingKeyMap.put("quick.orange.fox" , "被队列 Q1 接收到" ); bindingKeyMap.put("lazy.brown.fox" , "被队列 Q2 接收到" ); bindingKeyMap.put("lazy.pink.rabbit" , "虽然满足两个绑定但只被队列 Q2 接收一次" ); bindingKeyMap.put("quick.brown.fox" , "不匹配任何绑定不会被任何队列接收到会被丢弃" ); bindingKeyMap.put("quick.orange.male.rabbit" , "是四个单词不匹配任何绑定会被丢弃" ); bindingKeyMap.put("lazy.orange.male.rabbit" , "是四个单词但匹配 Q2" ); for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) { String bindingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME, bindingKey, null , message.getBytes("UTF-8" )); System.out.println("生产者发出消息:" + message); } } }
消费者Q1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class ReceiveLogsTopic01 { private static final String EXCHANGE_NAME = "topic_logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = "Q1" ; channel.queueDeclare(queueName, false , false , false , null ); channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*" ); System.out.println("等待接收消息........... " ); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String (delivery.getBody(), "UTF-8" ); System.out.println(" 接收队列:" + queueName + " 绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message); }; channel.basicConsume(queueName, true , deliverCallback, consumerTag -> { }); } }
消费者Q2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class ReceiveLogsTopic02 { private static final String EXCHANGE_NAME = "topic_logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = "Q2" ; channel.queueDeclare(queueName, false , false , false , null ); channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit" ); channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#" ); System.out.println("等待接收消息........... " ); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String (delivery.getBody(), "UTF-8" ); System.out.println(" 接收队列:" + queueName + " 绑定键:" + delivery.getEnvelope().getRoutingKey() + ",消息:" + message); }; channel.basicConsume(queueName, true , deliverCallback, consumerTag -> { }); } }
四、死信的概念 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理 解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息 进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费 ,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
死信的来源 死信实战
死信之TTL过期(延迟队列前置) 消费者 C1 代码:(启动之后关闭该消费者 模拟其接收不到消息)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public class Consumer01 { public static final String NORMAL_EXCHANGE = "normal_exchange" ; public static final String DEAD_EXCHANGE = "dead_exchange" ; public static final String NORMAL_QUEUE = "normal_queue" ; public static final String DEAD_QUEUE = "dead_queue" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); Map<String,Object> arguments = new HashMap <>(); arguments.put("x-dead-letter-exchange" ,DEAD_EXCHANGE); arguments.put("x-dead-letter-routing-key" ,"lisi" ); channel.queueDeclare(NORMAL_QUEUE,false ,false ,false ,arguments); channel.queueDeclare(DEAD_QUEUE,false ,false ,false ,null ); channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan" ); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi" ); System.out.println("等待接收消息..." ); DeliverCallback deliverCallback = (consumerTag,message) ->{ System.out.println("Consumer01接受的消息是:" +new String (message.getBody(),"UTF-8" )); }; channel.basicConsume(NORMAL_QUEUE,true ,deliverCallback,consumerTag -> {}); } }
生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class Producer { public static final String NORMAL_EXCHANGE = "normal_exchange" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); AMQP.BasicProperties properties = new AMQP .BasicProperties().builder().expiration("10000" ).build(); for (int i = 1 ; i <11 ; i++) { String message = "info" +i; channel.basicPublish(NORMAL_EXCHANGE,"zhangsan" ,properties,message.getBytes()); } } }
先启动消费者 C1,创建出队列,然后停止该 C1 的运行,则 C1 将无法收到队列的消息,无法收到的消息 10 秒后进入死信队列。启动生产者 producer 生产消息
消费者 C2 代码:
以上步骤完成后,启动 C2 消费者,它消费死信队列里面的消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class Consumer02 { public static final String DEAD_QUEUE = "dead_queue" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); System.out.println("等待接收死信消息..." ); DeliverCallback deliverCallback = (consumerTag, message) ->{ System.out.println("Consumer02接受的消息是:" +new String (message.getBody(),"UTF-8" )); }; channel.basicConsume(DEAD_QUEUE,true ,deliverCallback,consumerTag -> {}); } }
死信之最大长度 1、消息生产者代码去掉 TTL 属性
2、C1 消费者修改以下代码(启动之后关闭该消费者 模拟其接收不到消息)
1 2 params.put("x-max-length" ,6 );
注意此时需要把原先队列删除 因为参数改变了
3、C2 消费者代码不变(启动 C2 消费者)
死信之消息被拒 1、消息生产者代码同上生产者一致
2、C1 消费者拒收消息 "info5",开启手动应答(启动之后关闭该消费者 模拟其接收不到消息)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public class Consumer01 { public static final String NORMAL_EXCHANGE = "normal_exchange" ; public static final String DEAD_EXCHANGE = "dead_exchange" ; public static final String NORMAL_QUEUE = "normal_queue" ; public static final String DEAD_QUEUE = "dead_queue" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = RabbitMQUtils.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); Map<String,Object> arguments = new HashMap <>(); arguments.put("x-dead-letter-exchange" ,DEAD_EXCHANGE); arguments.put("x-dead-letter-routing-key" ,"lisi" ); channel.queueDeclare(NORMAL_QUEUE,false ,false ,false ,arguments); channel.queueDeclare(DEAD_QUEUE,false ,false ,false ,null ); channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan" ); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi" ); System.out.println("等待接收消息..." ); DeliverCallback deliverCallback = (consumerTag,message) ->{ String msg = new String (message.getBody(), "UTF-8" ); if (msg.equals("info5" )){ System.out.println("Consumer01接受的消息是:" +msg+": 此消息是被C1拒绝的" ); channel.basicReject(message.getEnvelope().getDeliveryTag(), false ); }else { System.out.println("Consumer01接受的消息是:" +msg); channel.basicAck(message.getEnvelope().getDeliveryTag(), false ); } }; channel.basicConsume(NORMAL_QUEUE,false ,deliverCallback,consumerTag -> {}); } }
3、C2 消费者代码不变
启动消费者 1 然后再启动消费者 2
五、延迟队列介绍 延迟队列概念: (延迟队列用于延迟消息处理的时间)
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列 。
延迟队列使用场景:
订单在十分钟之内未支付则自动取消 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒 用户注册成功后,如果三天内没有登陆则进行短信提醒 用户发起退款,如果三天内没有得到处理则通知相关运营人员 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议 这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如: 发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;那我们一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?
如果数据量比较少,确实可以这样做,比如:对于「如果账单一周内未支付则进行自动结算」这样的需求, 如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景 ,如:「订单十分钟内未支付则关闭」,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下 。
RabbitMQ 中的 TTL TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。
换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。
队列设置TTL
在创建队列的时候设置队列的“x-message-ttl”属性
1 2 3 Map<String, Object> params = new HashMap <>(); params.put("x-message-ttl" ,5000 );return QueueBuilder.durable("QA" ).withArguments(args).build();
消息设置TTL
是针对每条消息设置TTL
1 2 3 rabbitTemplate.converAndSend("X" ,"XC" ,message,correlationData -> { correlationData.getMessageProperties().setExpiration("5000" ); });
两者区别
如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间,具体看下方案例。
另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃
【重要※】整合 springboot 前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了。想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面, 成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。
1、创建一个空项目:
2、添加依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 1.2.47</version > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > </dependency > <dependency > <groupId > io.springfox</groupId > <artifactId > springfox-swagger2</artifactId > <version > 3.0.0</version > </dependency > <dependency > <groupId > io.springfox</groupId > <artifactId > springfox-swagger-ui</artifactId > <version > 3.0.0</version > </dependency > <dependency > <groupId > org.springframework.amqp</groupId > <artifactId > spring-rabbit-test</artifactId > <scope > test</scope > </dependency > </dependencies >
3、修改配置文件
1 2 3 4 spring.rabbitmq.host =42.192.149.71 spring.rabbitmq.port =5672 spring.rabbitmq.username =admin spring.rabbitmq.password =123456
4、添加Swagger 配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package com.oddfar.config;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import springfox.documentation.builders.ApiInfoBuilder;import springfox.documentation.service.ApiInfo;import springfox.documentation.service.Contact;import springfox.documentation.spi.DocumentationType;import springfox.documentation.spring.web.plugins.Docket;import springfox.documentation.swagger2.annotations.EnableSwagger2;@Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket webApiConfig () { return new Docket (DocumentationType.SWAGGER_2) .groupName("webApi" ) .apiInfo(webApiInfo()) .select() .build(); } private ApiInfo webApiInfo () { return new ApiInfoBuilder () .title("rabbitmq 接口文档" ) .description("本文档描述了 rabbitmq 微服务接口定义" ) .version("1.0" ) .contact(new Contact ("zhiyuan" , "http://oddfar.com" , "test@qq.com" )) .build(); } }
队列 TTL 创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是direct,创建一个死信队列 QD,它们的绑定关系如下:
原先配置队列信息,写在了生产者和消费者代码中,现在可写在配置类中,生产者只发消息,消费者只接受消息
1、配置文件类代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 @Configuration public class TtlQueueConfig { public static final String X_EXCHANGE="X" ; public static final String Y_DEAD_LETTER_EXCHANGE="Y" ; public static final String QUEUE_A="QA" ; public static final String QUEUE_B="QB" ; public static final String DEAD_LETTER_QUEUE="QD" ; @Bean("xExchange") public DirectExchange xExchange () { return new DirectExchange (X_EXCHANGE); } @Bean("yExchange") public DirectExchange yExchange () { return new DirectExchange (Y_DEAD_LETTER_EXCHANGE); } @Bean("queueA") public Queue queueA () { Map<String,Object> arguments = new HashMap <>(3 ); arguments.put("x-dead-letter-exchange" ,Y_DEAD_LETTER_EXCHANGE); arguments.put("x-dead-letter-routing-key" ,"YD" ); arguments.put("x-message-ttl" ,10000 ); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } @Bean("queueB") public Queue queueB () { Map<String,Object> arguments = new HashMap <>(3 ); arguments.put("x-dead-letter-exchange" ,Y_DEAD_LETTER_EXCHANGE); arguments.put("x-dead-letter-routing-key" ,"YD" ); arguments.put("x-message-ttl" ,40000 ); return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); } @Bean("queueD") public Queue queueD () { return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); } @Bean public Binding queueABindingX (@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueA).to(xExchange).with("XA" ); } @Bean public Binding queueBBindingX (@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueB).to(xExchange).with("XB" ); } @Bean public Binding queueDBindingY (@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(queueD).to(yExchange).with("YD" ); } }
2、消息生产者代码 Controller 层代码,获取消息,放到 RabbitMQ 里
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{message}") public void sendMsg (@PathVariable("message") String message) { log.info("当前时间:{},发送一条信息给两个TTL队列:{}" ,new Date ().toString(), message); rabbitTemplate.convertAndSend("X" ,"XA" ,"消息来自ttl为10s的队列:" +message); rabbitTemplate.convertAndSend("X" ,"XB" ,"消息来自ttl为40s的队列:" +message); } }
3、消息消费者代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Slf4j @Component public class DeadLetterQueueConsumer { @RabbitListener(queues = "QD") public void receiveD (Message message, Channel channel) throws IOException { String msg = new String (message.getBody()); log.info("当前时间:{},收到死信队列信息{}" , new Date ().toString(), msg); } }
发起一个请求:http://localhost:8888/ttl/sendMsg/嘻嘻嘻
第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息, 然后被消费掉,这样一个延时队列就打造完成了。
不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求? -- 延时队列TTL优化
延时队列TTL优化 在这里新增了一个队列 QC,该队列不设置 TTL 时间,根据前端的请求确定 TTL 时间,绑定关系如下:
配置文件类代码:
新增一个配置文件类,用于新增队列 QC,也可以放在上方的配置文件类里
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Configuration public class MsgTtlQueueConfig { public static final String QUEUE_C = "QC" ; public static final String Y_DEAD_LETTER_EXCHANGE="Y" ; @Bean("queueC") public Queue QueueC () { Map<String,Object> arguments = new HashMap <>(3 ); arguments.put("x-dead-letter-exchange" ,Y_DEAD_LETTER_EXCHANGE); arguments.put("x-dead-letter-routing-key" ,"YD" ); return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build(); } @Bean public Binding queueCBindingX (@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueC).to(xExchange).with("XC" ); } }
生产者代码:
Controller 新增方法
该方法接收的请求要带有 TTL 时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{message}") public void sendMsg (@PathVariable("message") String message) { log.info("当前时间:{},发送一条信息给两个TTL队列:{}" ,new Date ().toString(),message); rabbitTemplate.convertAndSend("X" ,"XA" ,"消息来自ttl为10s的队列:" +message); rabbitTemplate.convertAndSend("X" ,"XB" ,"消息来自ttl为40s的队列:" +message); } @GetMapping("/sendExpirationMsg/{message}/{ttlTime}") public void sendMsg (@PathVariable("message") String message, @PathVariable("ttlTime") String ttlTime) { log.info("当前时间:{},发送一条时长是{}毫秒TTL信息给队列QC:{}" , new Date ().toString(),ttlTime,message); rabbitTemplate.convertAndSend("X" ,"XC" ,message,msg -> { msg.getMessageProperties().setExpiration(ttlTime); return msg; }); } }
发起请求
http://localhost:8080/ttl/sendExpirationMsg/你好1/20000
http://localhost:8080/ttl/sendExpirationMsg/你好2/2000
看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时「死亡」
因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行
这也就是为什么如图的时间:你好 2 延时 2 秒,却后执行,还要等待你好 1 消费后再执行你好2
Rabbitmq 插件实现延迟队列 上文中提到的问题,确实是一个问题,如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题。
安装延时队列插件
可去官网下载 (opens new window) 找到 rabbitmq_delayed_message_exchange 插件,放置到 RabbitMQ 的插件目录。
因为官网也是跳转去该插件的 GitHub 地址进行下载:点击跳转(opens new window)
打开 Linux,用 Xftp
将插件放到 RabbitMQ 的安装目录下的 plgins 目录,
RabbitMQ 与其 plgins 目录默认分别位于
1 2 3 4 cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8 cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
其中我的版本是 /rabbitmq_server-3.8.8
进入目录后执行下面命令让该插件生效,然后重启 RabbitMQ
1 2 3 4 rabbitmq-plugins enable rabbitmq_delayed_message_exchange systemctl restart rabbitmq-server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 [root@master plugins] Enabling plugins on node rabbit@master: rabbitmq_delayed_message_exchange The following plugins have been configured: rabbitmq_delayed_message_exchange rabbitmq_management rabbitmq_management_agent rabbitmq_web_dispatch Applying plugin configuration to rabbit@master... The following plugins have been enabled: rabbitmq_delayed_message_exchange started 1 plugins. [root@master plugins]
解释
安装命令不能出现插件版本和后缀,如 rabbitmq-plugins enable rabbitmq_delayed_message_exchange-3.8.0.ez
会报错
必须是 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
,后面不允许填入版本和文件后缀
打开 Web 界面,查看交换机的新增功能列表,如果多出了如图所示,代表成功添加插件
插件实战 在这里新增了一个队列delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:
1、配置文件类代码 新增一个配置类 DelayedQueueConfig
,也可以放在原来的配置文件里,代码里使用了 CustomExchange
类,通过参数来自定义一个类型(direct、topic等)
在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @Configuration public class DelayedQueueConfig { public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange" ; public static final String DELAYED_QUEUE_NAME = "delayed.queue" ; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey" ; @Bean public Queue delayedQueue () { return new Queue (DELAYED_QUEUE_NAME); } @Bean public CustomExchange delayedExchange () { Map<String,Object> arguments = new HashMap <>(); arguments.put("x-delayed-type" ,"direct" ); return new CustomExchange (DELAYED_EXCHANGE_NAME,"x-delayed-message" , true ,false ,arguments); } @Bean public Binding delayedQueueBindingDelayedExchange ( @Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange) { return BindingBuilder.bind(delayedQueue).to(delayedExchange) .with(DELAYED_ROUTING_KEY).noargs(); } }
2、生产者代码 在 controller 里新增一个方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @GetMapping("/sendDelayMsg/{message}/{delayTime}") public void sendMsg (@PathVariable("message") String message, @PathVariable("delayTime") Integer delayTime) { log.info("当前时间:{},发送一条时长是{}毫秒TTL信息给延迟队列delayed.queue:{}" , new Date ().toString(),delayTime,message); rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg -> { msg.getMessageProperties().setDelay(delayTime); return msg; }); }
3、消费者代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Slf4j @Component public class DelayQueueConsumer { public static final String DELAYED_QUEUE_NAME = "delayed.queue" ; @RabbitListener(queues = DELAYED_QUEUE_NAME) public void receiveDelayedQueue (Message message) { String msg = new String (message.getBody()); log.info("当前时间:{},收到延时队列的消息:{}" , new Date ().toString(), msg); } }
发送请求:
第二个消息被先消费掉了,符合预期(可以看到哪怕 hello1 需要20秒再进入延时队列,hello2 2 秒后直接进入延时队列,无需等待 hello1
总结 延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为 单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景。