package com.smartor.config;
|
|
import org.springframework.amqp.core.*;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.stereotype.Component;
|
|
import java.util.HashMap;
|
import java.util.Map;
|
|
@Component
|
public class RabbitMqCallPhoneConfig {
|
|
@Autowired
|
private AmqpAdmin amqpAdmin;
|
|
@Autowired
|
private AmqpTemplate rabbitTemplate;
|
|
/**
|
* 创建交换机
|
*
|
* @param exchangeName 交换机名称
|
*/
|
public void createExchange(String exchangeName) {
|
Map<String, Object> args = new HashMap<>();
|
args.put("x-delayed-type", "topic");
|
CustomExchange exchange = new CustomExchange(exchangeName, "x-delayed-message", true, false, args);
|
amqpAdmin.declareExchange(exchange);
|
}
|
|
/**
|
* 创建队列
|
*
|
* @param queueName 队列名称
|
*/
|
public void createQueue(String queueName) {
|
Queue queue = new Queue(queueName, true, false, false);
|
amqpAdmin.declareQueue(queue);
|
}
|
|
/**
|
* 交换机绑定
|
*
|
* @param changeName 交换机名称
|
* @param routingKey 路由key
|
* @param queueName 队列名称
|
*/
|
public void bindExchange(String changeName, String routingKey, String queueName) {
|
Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, changeName, routingKey, null);
|
amqpAdmin.declareBinding(binding);
|
}
|
|
/**
|
* 发送信息到交换机
|
*
|
* @param changeName 交换机名称
|
* @param routingKey 路由key
|
* @param data 消息
|
*/
|
public void sendMessage(String changeName, String routingKey, String data, Long times) {
|
|
this.rabbitTemplate.convertAndSend(changeName, routingKey, data, message -> {
|
//注意这里时间可以使long,而且是设置header
|
message.getMessageProperties().setHeader("x-delay", times);
|
return message;
|
});
|
// rabbitTemplate.convertAndSend(changeName, routingKey, message);
|
}
|
|
/**
|
* 删除交换机
|
*
|
* @param changeName 交换机名称
|
*/
|
public void deleteExchange(String changeName) {
|
amqpAdmin.deleteExchange(changeName);
|
}
|
|
/**
|
* 删除队列
|
*
|
* @param queueName 队列名称
|
*/
|
public void deleteQueue(String queueName) {
|
amqpAdmin.deleteQueue(queueName);
|
}
|
|
/**
|
* 创建MQ
|
*
|
* @param exchangName
|
* @param queueName
|
* @param routingKey
|
* @return
|
*/
|
public Boolean createRabbitMq(String exchangName, String queueName, String routingKey) {
|
this.createExchange(exchangName);
|
this.createQueue(queueName);
|
this.bindExchange(exchangName, routingKey, queueName);
|
return true;
|
}
|
|
/**
|
* 先将需要创建的队列建好
|
*/
|
@Bean
|
public void createMq() {
|
createRabbitMq("phone_exchange", "ob_queue", "phone.ob1");
|
createRabbitMq("phone_exchange", "task_queue", "phone.123");
|
}
|
|
}
|