liusheng
2024-05-28 500130efe30180fe635ba4482e097e848d37c7e3
ruoyi-admin/src/main/java/com/ruoyi/web/component/RabbitMqReceiver.java
@@ -4,15 +4,17 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import com.ruoyi.common.core.redis.RedisCache;
import com.ruoyi.common.enums.ServiceFromEnum;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.web.task.PhoneTask;
import com.smartor.common.SendService;
import com.smartor.config.PhoneUtils;
import com.smartor.config.RabbitMqCallPhoneConfig;
import com.smartor.config.RobotPhoneUtils;
import com.smartor.domain.*;
import com.smartor.mapper.IvrTaskMapper;
import com.smartor.mapper.IvrTaskSingleMapper;
import com.smartor.service.IIvrLibaTemplateService;
import com.smartor.service.IIvrTaskTemplateService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@@ -20,6 +22,7 @@
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@@ -32,8 +35,10 @@
@Component//监听此队列
public class RabbitMqReceiver {
    @Value("${phonePath}")
    private String phonePath;
    private static IIvrLibaTemplateService ivrLibaTemplateService;
    private static IIvrTaskTemplateService ivrTaskTemplateService;
    private static IvrTaskSingleMapper ivrTaskcallMapper;
@@ -45,13 +50,19 @@
    private static RabbitMqCallPhoneConfig rabbitMqCallPhoneConfig;
    private static RobotPhoneUtils robotPhoneUtils;
    // 创建固定大小的线程池
    private static final ExecutorService executorService = Executors.newFixedThreadPool(10);
    @Autowired
    public void setIIvrTaskTemplateService(IIvrTaskTemplateService ivrTaskTemplateService) {
        RabbitMqReceiver.ivrTaskTemplateService = ivrTaskTemplateService;
    }
    @Autowired
    public void setIIvrLibaTemplateService(IIvrLibaTemplateService ivrLibaTemplateService) {
        RabbitMqReceiver.ivrLibaTemplateService = ivrLibaTemplateService;
    public void setRobotPhoneUtilsService(RobotPhoneUtils robotPhoneUtils) {
        RabbitMqReceiver.robotPhoneUtils = robotPhoneUtils;
    }
    @Autowired
@@ -80,11 +91,12 @@
    }
    /**
     * 电话随访
     * 任务随访
     * concurrency = "50"  并发数为50
     */
    @RabbitListener(queues = "phone_queue", concurrency = "50")
    @RabbitListener(queues = "task_queue", concurrency = "50")
    public void phoneVisit(String content, Message message, Channel channel) throws IOException {
        try {
            IvrTaskcallMQ ivrTaskcallMQ = null;
@@ -98,32 +110,53 @@
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            //通过模板ID获取模板问题
            IvrLibaTemplateVO ivrLibaTemplateVO = new IvrLibaTemplateVO();
            ivrLibaTemplateVO.setID(Long.valueOf(ivrTaskcallMQ.getTemplateid()));
            IvrLibaTemplateVO ivrLibaTemplateVO1 = ivrLibaTemplateService.selectInfoByCondition(ivrLibaTemplateVO);
            //通过任务ID拿到患者信息
            IvrTaskSingle ivrTaskcall = new IvrTaskSingle();
            ivrTaskcall.setTaskid(ivrTaskcallMQ.getTaskid());
            List<IvrTaskSingle> ivrTaskcalls = ivrTaskcallMapper.selectIvrTaskcallList(ivrTaskcall);
            for (IvrTaskSingle ivrTaskcall1 : ivrTaskcalls) {
                IvrTask ivrTask = ivrTaskMapper.selectIvrTaskByTaskid(ivrTaskcall1.getTaskid());
                if (StringUtils.isNotEmpty(ivrTask.getSendState()) && ivrTask.getSendState().equals(3) || StringUtils.isNotEmpty(ivrTask.getSendState()) && ivrTask.getSendState().equals(4)) {
                    //如何任务被“暂停”或“终止”
                    break;
                }
//                //通过多线程的方式去打电话
                executorService.submit(new PhoneTask(ivrTaskcall1, ivrLibaTemplateVO1, redisCache, rabbitMqCallPhoneConfig, message, sendService));
            //判断一下ivrTaskcallMQ中的stopstate是否与ivr_task中的一致,不一致,则说明是暂停了
            IvrTask ivrTask1 = ivrTaskMapper.selectIvrTaskByTaskid(ivrTaskcallMQ.getTaskid());
            if (ivrTask1.getStopState() != ivrTaskcallMQ.getStopState()) {
                //将消息从队列中剔除
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                return;
            }
            //通知 MQ 消息已被接收,可以ACK(从队列中删除)了   (这个需要根据业务再去处理ACK)
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            if (StringUtils.isNotEmpty(ivrTaskcallMQ.getServicefrom())) {
                //如何任务发送方式不为空
                String[] split = ivrTaskcallMQ.getServicefrom().split(",");
                for (String serviceFrom : split) {
                    String descByCode = ServiceFromEnum.getDescByCode(Integer.valueOf(serviceFrom));
                    //这里可以考虑用策略模式优化一下,不然太难看了
                    if (descByCode.equals("电话")) {
                        //通过模板ID获取模板问题
                        IvrTaskTemplateVO ivrTaskTemplateVO = new IvrTaskTemplateVO();
                        ivrTaskTemplateVO.setID(Long.valueOf(ivrTaskcallMQ.getTemplateid()));
                        IvrTaskTemplateVO ivrTaskTemplateVO1 = ivrTaskTemplateService.selectInfoByCondition(ivrTaskTemplateVO);
                        //通过任务ID拿到患者信息
                        IvrTaskSingle ivrTaskcall = new IvrTaskSingle();
                        ivrTaskcall.setTaskid(ivrTaskcallMQ.getTaskid());
                        List<IvrTaskSingle> ivrTaskcalls = ivrTaskcallMapper.selectIvrTaskcallList(ivrTaskcall);
                        for (IvrTaskSingle ivrTaskcall1 : ivrTaskcalls) {
                            IvrTask ivrTask = ivrTaskMapper.selectIvrTaskByTaskid(ivrTaskcall1.getTaskid());
                            if (StringUtils.isNotEmpty(ivrTask.getSendState()) && ivrTask.getSendState().equals(3) || StringUtils.isNotEmpty(ivrTask.getSendState()) && ivrTask.getSendState().equals(4)) {
                                //如何任务被“暂停”或“终止”
                                break;
                            }
                            //通过多线程的方式去打电话
                            executorService.submit(new PhoneTask(ivrTaskcall1, ivrTaskTemplateVO1, redisCache, rabbitMqCallPhoneConfig, message, sendService, phonePath, robotPhoneUtils));
                        }
                    } else if (descByCode.equals("公众号")) {
                    }
                    //通知 MQ 消息已被接收,可以ACK(从队列中删除)了   (这个需要根据业务再去处理ACK)
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                }
            }
        } catch (Exception e) {
            log.error("============消费失败,尝试消息补发再次消费!==============");
            log.error(e.getMessage());
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
    /**
@@ -162,6 +195,7 @@
        }
    }
    @Bean
    public SimpleRabbitListenerContainerFactory jsonContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();