liusheng
2024-06-05 0ef7bb2a442c039b01599010fa3370f7c59ef0c4
ruoyi-admin/src/main/java/com/ruoyi/web/component/RabbitMqReceiver.java
@@ -1,18 +1,22 @@
package com.ruoyi.web.component;
import com.alibaba.fastjson2.JSON;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import com.ruoyi.common.core.redis.RedisCache;
import com.ruoyi.common.enums.ServiceFromEnum;
import com.ruoyi.common.utils.RSAPublicKeyExample;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.web.task.PhoneTask;
import com.smartor.common.SendService;
import com.smartor.config.PhoneUtils;
import com.smartor.config.RabbitMqCallPhoneConfig;
import com.smartor.config.RobotPhoneUtils;
import com.smartor.domain.*;
import com.smartor.mapper.IvrTaskMapper;
import com.smartor.mapper.IvrTaskSingleMapper;
import com.smartor.service.IIvrLibaTemplateService;
import com.smartor.service.IIvrTaskTemplateService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@@ -20,20 +24,32 @@
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component//监听此队列
public class RabbitMqReceiver {
    @Value("${phonePath}")
    private String phonePath;
    private static IIvrLibaTemplateService ivrLibaTemplateService;
    @Value("${pub_key}")
    private String pub_key;
    @Value("${req_path}")
    private String req_path;
    private static IIvrTaskTemplateService ivrTaskTemplateService;
    private static IvrTaskSingleMapper ivrTaskcallMapper;
@@ -45,13 +61,20 @@
    private static RabbitMqCallPhoneConfig rabbitMqCallPhoneConfig;
    private static RobotPhoneUtils robotPhoneUtils;
    // 创建固定大小的线程池
    private static final ExecutorService executorService = Executors.newFixedThreadPool(10);
    @Autowired
    public void setIIvrTaskTemplateService(IIvrTaskTemplateService ivrTaskTemplateService) {
        RabbitMqReceiver.ivrTaskTemplateService = ivrTaskTemplateService;
    }
    @Autowired
    public void setIIvrLibaTemplateService(IIvrLibaTemplateService ivrLibaTemplateService) {
        RabbitMqReceiver.ivrLibaTemplateService = ivrLibaTemplateService;
    public void setRobotPhoneUtilsService(RobotPhoneUtils robotPhoneUtils) {
        RabbitMqReceiver.robotPhoneUtils = robotPhoneUtils;
    }
    @Autowired
@@ -80,17 +103,17 @@
    }
    /**
     * 电话随访
     * 任务随访
     * concurrency = "50"  并发数为50
     */
    @RabbitListener(queues = "phone_queue", concurrency = "50")
    @RabbitListener(queues = "task_queue", concurrency = "50")
    public void phoneVisit(String content, Message message, Channel channel) throws IOException {
        log.error("消息进来了--------------");
        IvrTaskcallMQ ivrTaskcallMQ = null;
        try {
            IvrTaskcallMQ ivrTaskcallMQ = null;
            ObjectMapper mapper = new ObjectMapper();
            try {
                if (!content.contains("{")) {
                if (!content.startsWith("{")) {
                    ivrTaskcallMQ = mapper.readValue("{" + content + "}", IvrTaskcallMQ.class);
                } else {
                    ivrTaskcallMQ = mapper.readValue(content, IvrTaskcallMQ.class);
@@ -98,32 +121,96 @@
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            //通过模板ID获取模板问题
            IvrLibaTemplateVO ivrLibaTemplateVO = new IvrLibaTemplateVO();
            ivrLibaTemplateVO.setID(Long.valueOf(ivrTaskcallMQ.getTemplateid()));
            IvrLibaTemplateVO ivrLibaTemplateVO1 = ivrLibaTemplateService.selectInfoByCondition(ivrLibaTemplateVO);
            //通过任务ID拿到患者信息
            IvrTaskSingle ivrTaskcall = new IvrTaskSingle();
            ivrTaskcall.setTaskid(ivrTaskcallMQ.getTaskid());
            List<IvrTaskSingle> ivrTaskcalls = ivrTaskcallMapper.selectIvrTaskcallList(ivrTaskcall);
            for (IvrTaskSingle ivrTaskcall1 : ivrTaskcalls) {
                IvrTask ivrTask = ivrTaskMapper.selectIvrTaskByTaskid(ivrTaskcall1.getTaskid());
                if (StringUtils.isNotEmpty(ivrTask.getSendState()) && ivrTask.getSendState().equals(3) || StringUtils.isNotEmpty(ivrTask.getSendState()) && ivrTask.getSendState().equals(4)) {
                    //如何任务被“暂停”或“终止”
                    break;
                }
//                //通过多线程的方式去打电话
                executorService.submit(new PhoneTask(ivrTaskcall1, ivrLibaTemplateVO1, redisCache, rabbitMqCallPhoneConfig, message, sendService));
            //判断一下ivrTaskcallMQ中的stopstate是否与ivr_task中的一致,不一致,则说明是暂停了
            IvrTask ivrTask1 = ivrTaskMapper.selectIvrTaskByTaskid(ivrTaskcallMQ.getTaskid());
            if (ivrTask1.getStopState() != ivrTaskcallMQ.getStopState()) {
                //将消息从队列中剔除
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                return;
            }
            //通知 MQ 消息已被接收,可以ACK(从队列中删除)了   (这个需要根据业务再去处理ACK)
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            if (StringUtils.isNotEmpty(ivrTaskcallMQ.getPreachform())) {
                //如何任务发送方式不为空
                String[] split = ivrTaskcallMQ.getPreachform().split(",");
                for (String serviceFrom : split) {
                    String descByCode = ServiceFromEnum.getDescByCode(Integer.valueOf(serviceFrom));
                    //这里可以考虑用策略模式优化一下,不然太难看了
                    //通过模板ID获取模板问题
                    IvrTaskTemplateVO ivrTaskTemplateVO = new IvrTaskTemplateVO();
                    ivrTaskTemplateVO.setID(Long.valueOf(ivrTaskcallMQ.getTemplateid()));
                    IvrTaskTemplateVO ivrTaskTemplateVO1 = ivrTaskTemplateService.selectInfoByCondition(ivrTaskTemplateVO);
                    //通过任务ID拿到患者信息
                    IvrTaskSingle ivrTaskcall = new IvrTaskSingle();
                    ivrTaskcall.setTaskid(ivrTaskcallMQ.getTaskid());
                    List<IvrTaskSingle> ivrTaskcalls = ivrTaskcallMapper.selectIvrTaskcallList(ivrTaskcall);
                    if (descByCode.equals("电话")) {
                        for (IvrTaskSingle ivrTaskcall1 : ivrTaskcalls) {
                            IvrTask ivrTask = ivrTaskMapper.selectIvrTaskByTaskid(ivrTaskcall1.getTaskid());
                            if (ivrTask.getSendState() != null && ivrTask.getSendState() == 3 || ivrTask.getSendState() != null && ivrTask.getSendState() == 4) {
                                //如何任务被“暂停”或“终止”
                                break;
                            }
                            //通过多线程的方式去打电话
                            executorService.submit(new PhoneTask(ivrTaskcall1, ivrTaskTemplateVO1, redisCache, rabbitMqCallPhoneConfig, message, sendService, phonePath, robotPhoneUtils));
                        }
                    } else if (descByCode.equals("多媒体")) {
                        //多媒体
                    } else if (descByCode.equals("纸质")) {
                        //纸质
                    } else if (descByCode.equals("短信")) {
                        //短信
//                        http://localhost:8099/followvisit/particty?param1=3&param2=348
                        //对url中两个参数加密
                        RSAPublicKeyExample rsaPublicKeyExample = new RSAPublicKeyExample();
                        String taskId = rsaPublicKeyExample.encryptedData(ivrTask1.getTaskid().toString(), pub_key);
                        for (IvrTaskSingle ivrTaskSingle : ivrTaskcalls) {
                            String patid = rsaPublicKeyExample.encryptedData(ivrTaskSingle.getPatid().toString(), pub_key);
                            SendMagParam sendMagParam = new SendMagParam();
                            sendMagParam.setType("4");
                            sendMagParam.setUrl("192.168.2.10:8099/followvisit/particty?param1=" + taskId + "&param2=" + patid);
                            sendService.sendMsg(sendMagParam);
                        }
                    } else if (descByCode.equals("公众号")) {
                        //公众号
                        RSAPublicKeyExample rsaPublicKeyExample = new RSAPublicKeyExample();
                        String taskId = rsaPublicKeyExample.encryptedData(ivrTask1.getTaskid().toString(), pub_key);
                        for (IvrTaskSingle ivrTaskSingle : ivrTaskcalls) {
                            String patid = rsaPublicKeyExample.encryptedData(ivrTaskSingle.getPatid().toString(), pub_key);
                            SendMagParam sendMagParam = new SendMagParam();
                            sendMagParam.setType("5");
                            sendMagParam.setUrl(req_path + "/outsideChain?param1=" + taskId + "&param2=" + patid + "&param3=" + ivrTask1.getTaskName());
                            log.info("链接完整路径:{}", req_path + "/outsideChain?param1=" + taskId + "&param2=" + patid + "&param3=" + ivrTask1.getTaskName());
                            //这个模板ID先写死,后面做成可选
                            sendMagParam.setTmpCode("oG3pJHPVWpE81DmZsua_2tKwgJ97r0qz37z56ns7NB4");
                            Map map = new HashMap();
                            map.put("first", ivrTask1.getTaskName());
                            sendMagParam.setContent(JSON.toJSONString(map));
                            sendService.sendMsg(sendMagParam);
                        }
                    }
                    //通知 MQ 消息已被接收,可以ACK(从队列中删除)了   (这个需要根据业务再去处理ACK)
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                }
            }
        } catch (Exception e) {
            Integer integer = redisCache.getCacheObject(ivrTaskcallMQ.getTaskid().toString());
            if (integer != null && integer == 2) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } else if (integer == null) {
                redisCache.setCacheObject(ivrTaskcallMQ.getTaskid().toString(), 1, 120, TimeUnit.MINUTES);
            } else {
                redisCache.setCacheObject(ivrTaskcallMQ.getTaskid().toString(), integer + 1, 120, TimeUnit.MINUTES);
            }
            log.error("============消费失败,尝试消息补发再次消费!==============");
            log.error(e.getMessage());
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
    /**
@@ -162,6 +249,7 @@
        }
    }
    @Bean
    public SimpleRabbitListenerContainerFactory jsonContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();