(资料图)
要实现RabbitMQ的消息队列延迟功能,一般采用官方提供的 rabbitmq_delayed_message_exchange插件。但RabbitMQ版本必须是3.5.8以上才支持该插件,否则得用其死信队列功能。
rabbitmq-plugins list命令用于查看RabbitMQ安装的插件。rabbitmq-plugins list检查RabbitMQ插件安装情况
如果没有安装插件,则直接访问官网进行下载
https://www.rabbitmq.com/community-plugins.html下载后,将其拷贝到RabbitMQ安装目录的plugins目录;并进行解压,如:
E:\software\RabbitMQ Server\rabbitmq_server-3.11.13\plugins打开cmd命令行窗口,如果系统已经配置RabbitMQ环境变量,则直接执行以下的命令进行安装;否则需要进入到RabbitMQ安装目录的sbin目录。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange4.0.0 com.olive rabbitmq-spring-demo 0.0.1-SNAPSHOT org.springframework.boot spring-boot-starter-parent 2.7.7 org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.5 org.apache.maven.plugins maven-compiler-plugin 1.8 1.8 server: port: 8080spring: #给项目来个名字 application: name: rabbitmq-spring-demo #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.1 port: 5672 username: admin password: admin123 #虚拟host。可以不设置,使用server默认host;不同虚拟路径下的队列是隔离的 virtual-host: /package com.olive.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.CustomExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;/** * RabbitMQ配置类 **/@Configurationpublic class RabbitMqConfig {public static final String DELAY_EXCHANGE_NAME = "delayed_exchange";public static final String DELAY_QUEUE_NAME = "delay_queue_name";public static final String DELAY_ROUTING_KEY = "delay_routing_key";@Beanpublic CustomExchange delayExchange() {Map args = new HashMap<>();args.put("x-delayed-type", "direct");return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true, false, args);}@Beanpublic Queue queue() {Queue queue = new Queue(DELAY_QUEUE_NAME, true);return queue;}@Beanpublic Binding binding(Queue queue, CustomExchange delayExchange) {return BindingBuilder.bind(queue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();}} 实现消息发送,设置消息延迟5s。
package com.olive.service;import java.text.SimpleDateFormat;import java.util.Date;import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import com.olive.config.RabbitMqConfig;/** * 消息发送者 **/@Servicepublic class CustomMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendMsg(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("消息发送时间:" + sdf.format(new Date()));rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE_NAME, RabbitMqConfig.DELAY_ROUTING_KEY, msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 消息延迟5秒message.getMessageProperties().setHeader("x-delay", 5000);return message;}});}}package com.olive.service;import java.text.SimpleDateFormat;import java.util.Date;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import com.olive.config.RabbitMqConfig;/** * 消息接收者 **/@Componentpublic class CustomMessageReceiver {@RabbitListener(queues = RabbitMqConfig.DELAY_QUEUE_NAME)public void receive(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println(sdf.format(new Date()) + msg);System.out.println("Receiver:执行取消订单");}}package com.olive.controller;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;import com.olive.service.CustomMessageSender;@RestControllerpublic class DelayMessageController {@Autowiredprivate CustomMessageSender customMessageSender;@GetMapping("/sendMessage")public String sendMessage() {// 发送消息customMessageSender.sendMsg("你已经支付超时,取消订单通知!");return "success";}}发送消息,访问
http://127.0.0.1:8080/sendMessage查看控制台打印的信息