package com.smartor.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; @Component public class RabbitMqCallPhoneConfig { @Autowired private AmqpAdmin amqpAdmin; @Autowired private AmqpTemplate rabbitTemplate; /** * 创建交换机 * * @param exchangeName 交换机名称 */ public void createExchange(String exchangeName) { Map args = new HashMap<>(); args.put("x-delayed-type", "topic"); CustomExchange exchange = new CustomExchange(exchangeName, "x-delayed-message", true, false, args); amqpAdmin.declareExchange(exchange); } /** * 创建队列 * * @param queueName 队列名称 */ public void createQueue(String queueName) { Queue queue = new Queue(queueName, true, false, false); amqpAdmin.declareQueue(queue); } /** * 交换机绑定 * * @param changeName 交换机名称 * @param routingKey 路由key * @param queueName 队列名称 */ public void bindExchange(String changeName, String routingKey, String queueName) { Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, changeName, routingKey, null); amqpAdmin.declareBinding(binding); } /** * 发送信息到交换机 * * @param changeName 交换机名称 * @param routingKey 路由key * @param data 消息 */ public void sendMessage(String changeName, String routingKey, String data, Long times) { this.rabbitTemplate.convertAndSend(changeName, routingKey, data, message -> { //注意这里时间可以使long,而且是设置header message.getMessageProperties().setHeader("x-delay", times); return message; }); // rabbitTemplate.convertAndSend(changeName, routingKey, message); } /** * 删除交换机 * * @param changeName 交换机名称 */ public void deleteExchange(String changeName) { amqpAdmin.deleteExchange(changeName); } /** * 删除队列 * * @param queueName 队列名称 */ public void deleteQueue(String queueName) { amqpAdmin.deleteQueue(queueName); } /** * 创建MQ * * @param exchangName * @param queueName * @param routingKey * @return */ public Boolean createRabbitMq(String exchangName, String queueName, String routingKey) { this.createExchange(exchangName); this.createQueue(queueName); this.bindExchange(exchangName, routingKey, queueName); return true; } /** * 先将需要创建的队列建好 */ @Bean public void createMq() { createRabbitMq("phone_exchange", "ob_queue", "phone.ob1"); createRabbitMq("phone_exchange", "task_queue", "phone.123"); } }