package com.ruoyi.web.component;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.ruoyi.common.core.redis.RedisCache;
|
import com.ruoyi.web.task.PhoneTask;
|
import com.smartor.config.RabbitMqConfig;
|
import com.smartor.domain.IvrLibaTemplateVO;
|
import com.smartor.domain.IvrTaskSingle;
|
import com.smartor.domain.IvrTaskcallMQ;
|
import com.smartor.mapper.IvrTaskSingleMapper;
|
import com.smartor.service.IIvrLibaTemplateService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.amqp.core.Message;
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.stereotype.Component;
|
import com.rabbitmq.client.Channel;
|
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
|
import java.io.IOException;
|
import java.util.List;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
|
@Slf4j
|
@Component//监听此队列
|
public class RabbitMqReceiver {
|
|
private static IIvrLibaTemplateService ivrLibaTemplateService;
|
|
private static IvrTaskSingleMapper ivrTaskcallMapper;
|
|
private static RedisCache redisCache;
|
|
// 创建固定大小的线程池
|
private static final ExecutorService executorService = Executors.newFixedThreadPool(10);
|
|
|
@Autowired
|
public void setIIvrLibaTemplateService(IIvrLibaTemplateService ivrLibaTemplateService) {
|
RabbitMqReceiver.ivrLibaTemplateService = ivrLibaTemplateService;
|
}
|
|
@Autowired
|
public void setIvrTaskcallMapper(IvrTaskSingleMapper ivrTaskcallMapper) {
|
RabbitMqReceiver.ivrTaskcallMapper = ivrTaskcallMapper;
|
}
|
|
@Autowired
|
public void setRedisCache(RedisCache redisCache) {
|
RabbitMqReceiver.redisCache = redisCache;
|
}
|
|
// @RabbitListener(queues = RabbitMqConfig.delay_queue)
|
// public void consultReceiveDealy(String content, Message message, Channel channel) throws IOException {
|
// log.info("----------------接收延迟队列消息--------------------");
|
// //通知 MQ 消息已被接收,可以ACK(从队列中删除)了
|
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
// try {
|
// log.info("=============Do Something==============");
|
// } catch (Exception e) {
|
// log.error("============消费失败,尝试消息补发再次消费!==============");
|
// log.error(e.getMessage());
|
// /**
|
// * basicRecover方法是进行补发操作,
|
// * 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
|
// * 设置为false是只补发给当前的consumer
|
// */
|
// channel.basicRecover(false);
|
// }
|
// }
|
|
/**
|
* 电话随访
|
* concurrency = "50" 并发数为50
|
*/
|
@RabbitListener(queues = RabbitMqConfig.phone_queue, concurrency = "50")
|
public void phoneVisit(String content, Message message, Channel channel) throws IOException {
|
try {
|
IvrTaskcallMQ ivrTaskcallMQ = null;
|
|
ObjectMapper mapper = new ObjectMapper();
|
try {
|
ivrTaskcallMQ = mapper.readValue("{" + content + "}", IvrTaskcallMQ.class);
|
} catch (JsonProcessingException e) {
|
e.printStackTrace();
|
}
|
//通过模板ID获取模板问题
|
IvrLibaTemplateVO ivrLibaTemplateVO = new IvrLibaTemplateVO();
|
ivrLibaTemplateVO.setID(Long.valueOf(ivrTaskcallMQ.getTemplateid()));
|
IvrLibaTemplateVO ivrLibaTemplateVO1 = ivrLibaTemplateService.selectInfoByCondition(ivrLibaTemplateVO);
|
|
//通过任务ID拿到患者信息
|
IvrTaskSingle ivrTaskcall = new IvrTaskSingle();
|
ivrTaskcall.setTaskid(ivrTaskcallMQ.getTaskid());
|
List<IvrTaskSingle> ivrTaskcalls = ivrTaskcallMapper.selectIvrTaskcallList(ivrTaskcall);
|
|
for (IvrTaskSingle ivrTaskcall1 : ivrTaskcalls) {
|
//通过多线程的方式去打电话
|
executorService.submit(new PhoneTask(ivrTaskcall1, ivrLibaTemplateVO1, redisCache, channel, message));
|
}
|
//通知 MQ 消息已被接收,可以ACK(从队列中删除)了 (这个需要根据业务再去处理ACK)
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
|
log.info("=============Do Something==============");
|
} catch (Exception e) {
|
log.error("============消费失败,尝试消息补发再次消费!==============");
|
log.error(e.getMessage());
|
/**
|
* basicRecover方法是进行补发操作,
|
* 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
|
* 设置为false是只补发给当前的consumer
|
*/
|
channel.basicRecover(false);
|
}
|
}
|
|
@Bean
|
public SimpleRabbitListenerContainerFactory jsonContainerFactory(ConnectionFactory connectionFactory) {
|
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
factory.setConnectionFactory(connectionFactory);
|
factory.setMessageConverter(new Jackson2JsonMessageConverter());
|
return factory;
|
}
|
}
|