package com.ruoyi.web.component; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.ruoyi.common.core.redis.RedisCache; import com.ruoyi.common.utils.DateUtils; import com.ruoyi.web.task.PhoneTask; import com.smartor.config.RabbitMqConfig; import com.smartor.domain.IvrLibaTemplateVO; import com.smartor.domain.IvrTaskcall; import com.smartor.domain.IvrTaskcallMQ; import com.smartor.mapper.IvrTaskcallMapper; import com.smartor.service.IIvrLibaTemplateService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; import java.io.IOException; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j @Component//监听此队列 public class RabbitMqReceiver { private static IIvrLibaTemplateService ivrLibaTemplateService; private static IvrTaskcallMapper ivrTaskcallMapper; private static RedisCache redisCache; // 创建固定大小的线程池 private static final ExecutorService executorService = Executors.newFixedThreadPool(10); @Autowired public void setIIvrLibaTemplateService(IIvrLibaTemplateService ivrLibaTemplateService) { RabbitMqReceiver.ivrLibaTemplateService = ivrLibaTemplateService; } @Autowired public void setIvrTaskcallMapper(IvrTaskcallMapper ivrTaskcallMapper) { RabbitMqReceiver.ivrTaskcallMapper = ivrTaskcallMapper; } @Autowired public void setRedisCache(RedisCache redisCache) { RabbitMqReceiver.redisCache = redisCache; } // @RabbitListener(queues = RabbitMqConfig.delay_queue) // public void consultReceiveDealy(String content, Message message, Channel channel) throws IOException { // log.info("----------------接收延迟队列消息--------------------"); // //通知 MQ 消息已被接收,可以ACK(从队列中删除)了 // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // try { // log.info("=============Do Something=============="); // } catch (Exception e) { // log.error("============消费失败,尝试消息补发再次消费!=============="); // log.error(e.getMessage()); // /** // * basicRecover方法是进行补发操作, // * 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到, // * 设置为false是只补发给当前的consumer // */ // channel.basicRecover(false); // } // } /** * 电话随访 * concurrency = "50" 并发数为50 */ @RabbitListener(queues = RabbitMqConfig.phone_queue, concurrency = "50") public void phoneVisit(String content, Message message, Channel channel) throws IOException { // System.out.println(content); // IvrTaskcallMQ ivrTaskcallMQ = null; // ObjectMapper mapper = new ObjectMapper(); // try { // ivrTaskcallMQ = mapper.readValue(content, IvrTaskcallMQ.class); // } catch (JsonProcessingException e) { // e.printStackTrace(); // } // //通过模板ID获取模板问题 // IvrLibaTemplateVO ivrLibaTemplateVO = new IvrLibaTemplateVO(); // ivrLibaTemplateVO.setID(Long.valueOf(ivrTaskcallMQ.getTemplateid())); // IvrLibaTemplateVO ivrLibaTemplateVO1 = ivrLibaTemplateService.selectInfoByCondition(ivrLibaTemplateVO); // // //通过任务ID拿到患者信息 // IvrTaskcall ivrTaskcall = new IvrTaskcall(); // ivrTaskcall.setTaskid(ivrTaskcallMQ.getTaskid()); // List ivrTaskcalls = ivrTaskcallMapper.selectIvrTaskcallList(ivrTaskcall); // // for (IvrTaskcall ivrTaskcall1 : ivrTaskcalls) { // //通过多线程的方式去打电话 // executorService.submit(new PhoneTask(ivrTaskcall1, ivrLibaTemplateVO1, redisCache)); // } //通知 MQ 消息已被接收,可以ACK(从队列中删除)了 (这个需要根据业务再去处理ACK) channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); try { log.info("=============Do Something=============="); } catch (Exception e) { log.error("============消费失败,尝试消息补发再次消费!=============="); log.error(e.getMessage()); /** * basicRecover方法是进行补发操作, * 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到, * 设置为false是只补发给当前的consumer */ channel.basicRecover(false); } } }