package com.ruoyi.web.component;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.ruoyi.common.core.redis.RedisCache;
|
import com.ruoyi.common.utils.DateUtils;
|
import com.ruoyi.web.task.PhoneTask;
|
import com.smartor.config.RabbitMqConfig;
|
import com.smartor.domain.IvrLibaTemplateVO;
|
import com.smartor.domain.IvrTaskcall;
|
import com.smartor.domain.IvrTaskcallMQ;
|
import com.smartor.mapper.IvrTaskcallMapper;
|
import com.smartor.service.IIvrLibaTemplateService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.amqp.core.Message;
|
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Component;
|
import com.rabbitmq.client.Channel;
|
|
import java.io.IOException;
|
import java.util.List;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
|
@Slf4j
|
@Component//监听此队列
|
public class RabbitMqReceiver {
|
|
private static IIvrLibaTemplateService ivrLibaTemplateService;
|
|
private static IvrTaskcallMapper ivrTaskcallMapper;
|
|
private static RedisCache redisCache;
|
|
// 创建固定大小的线程池
|
private static final ExecutorService executorService = Executors.newFixedThreadPool(10);
|
|
|
@Autowired
|
public void setIIvrLibaTemplateService(IIvrLibaTemplateService ivrLibaTemplateService) {
|
RabbitMqReceiver.ivrLibaTemplateService = ivrLibaTemplateService;
|
}
|
|
@Autowired
|
public void setIvrTaskcallMapper(IvrTaskcallMapper ivrTaskcallMapper) {
|
RabbitMqReceiver.ivrTaskcallMapper = ivrTaskcallMapper;
|
}
|
|
@Autowired
|
public void setRedisCache(RedisCache redisCache) {
|
RabbitMqReceiver.redisCache = redisCache;
|
}
|
|
// @RabbitListener(queues = RabbitMqConfig.delay_queue)
|
// public void consultReceiveDealy(String content, Message message, Channel channel) throws IOException {
|
// log.info("----------------接收延迟队列消息--------------------");
|
// //通知 MQ 消息已被接收,可以ACK(从队列中删除)了
|
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
// try {
|
// log.info("=============Do Something==============");
|
// } catch (Exception e) {
|
// log.error("============消费失败,尝试消息补发再次消费!==============");
|
// log.error(e.getMessage());
|
// /**
|
// * basicRecover方法是进行补发操作,
|
// * 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
|
// * 设置为false是只补发给当前的consumer
|
// */
|
// channel.basicRecover(false);
|
// }
|
// }
|
|
/**
|
* 电话随访
|
* concurrency = "50" 并发数为50
|
*/
|
@RabbitListener(queues = RabbitMqConfig.phone_queue, concurrency = "50")
|
public void phoneVisit(String content, Message message, Channel channel) throws IOException {
|
// System.out.println(content);
|
// IvrTaskcallMQ ivrTaskcallMQ = null;
|
// ObjectMapper mapper = new ObjectMapper();
|
// try {
|
// ivrTaskcallMQ = mapper.readValue(content, IvrTaskcallMQ.class);
|
// } catch (JsonProcessingException e) {
|
// e.printStackTrace();
|
// }
|
// //通过模板ID获取模板问题
|
// IvrLibaTemplateVO ivrLibaTemplateVO = new IvrLibaTemplateVO();
|
// ivrLibaTemplateVO.setID(Long.valueOf(ivrTaskcallMQ.getTemplateid()));
|
// IvrLibaTemplateVO ivrLibaTemplateVO1 = ivrLibaTemplateService.selectInfoByCondition(ivrLibaTemplateVO);
|
//
|
// //通过任务ID拿到患者信息
|
// IvrTaskcall ivrTaskcall = new IvrTaskcall();
|
// ivrTaskcall.setTaskid(ivrTaskcallMQ.getTaskid());
|
// List<IvrTaskcall> ivrTaskcalls = ivrTaskcallMapper.selectIvrTaskcallList(ivrTaskcall);
|
//
|
// for (IvrTaskcall ivrTaskcall1 : ivrTaskcalls) {
|
// //通过多线程的方式去打电话
|
// executorService.submit(new PhoneTask(ivrTaskcall1, ivrLibaTemplateVO1, redisCache));
|
// }
|
//通知 MQ 消息已被接收,可以ACK(从队列中删除)了 (这个需要根据业务再去处理ACK)
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
try {
|
log.info("=============Do Something==============");
|
} catch (Exception e) {
|
log.error("============消费失败,尝试消息补发再次消费!==============");
|
log.error(e.getMessage());
|
/**
|
* basicRecover方法是进行补发操作,
|
* 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
|
* 设置为false是只补发给当前的consumer
|
*/
|
channel.basicRecover(false);
|
}
|
}
|
}
|