liusheng
2024-06-05 0ef7bb2a442c039b01599010fa3370f7c59ef0c4
ruoyi-admin/src/main/java/com/ruoyi/web/component/RabbitMqReceiver.java
@@ -1,41 +1,55 @@
package com.ruoyi.web.component;
import com.alibaba.fastjson2.JSON;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import com.ruoyi.common.core.redis.RedisCache;
import com.ruoyi.common.enums.ServiceFromEnum;
import com.ruoyi.common.utils.RSAPublicKeyExample;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.web.task.PhoneTask;
import com.ruoyi.web.task.PhoneTask2;
import com.smartor.common.SendService;
import com.smartor.config.RabbitMqConfig;
import com.smartor.domain.IvrLibaTemplateVO;
import com.smartor.domain.IvrTask;
import com.smartor.domain.IvrTaskSingle;
import com.smartor.domain.IvrTaskcallMQ;
import com.smartor.config.PhoneUtils;
import com.smartor.config.RabbitMqCallPhoneConfig;
import com.smartor.config.RobotPhoneUtils;
import com.smartor.domain.*;
import com.smartor.mapper.IvrTaskMapper;
import com.smartor.mapper.IvrTaskSingleMapper;
import com.smartor.service.IIvrLibaTemplateService;
import com.smartor.service.IIvrTaskTemplateService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component//监听此队列
public class RabbitMqReceiver {
    private static IIvrLibaTemplateService ivrLibaTemplateService;
    @Value("${phonePath}")
    private String phonePath;
    @Value("${pub_key}")
    private String pub_key;
    @Value("${req_path}")
    private String req_path;
    private static IIvrTaskTemplateService ivrTaskTemplateService;
    private static IvrTaskSingleMapper ivrTaskcallMapper;
@@ -45,18 +59,32 @@
    private static RedisCache redisCache;
    private static RabbitMqCallPhoneConfig rabbitMqCallPhoneConfig;
    private static RobotPhoneUtils robotPhoneUtils;
    // 创建固定大小的线程池
    private static final ExecutorService executorService = Executors.newFixedThreadPool(10);
    @Autowired
    public void setIIvrTaskTemplateService(IIvrTaskTemplateService ivrTaskTemplateService) {
        RabbitMqReceiver.ivrTaskTemplateService = ivrTaskTemplateService;
    }
    @Autowired
    public void setIIvrLibaTemplateService(IIvrLibaTemplateService ivrLibaTemplateService) {
        RabbitMqReceiver.ivrLibaTemplateService = ivrLibaTemplateService;
    public void setRobotPhoneUtilsService(RobotPhoneUtils robotPhoneUtils) {
        RabbitMqReceiver.robotPhoneUtils = robotPhoneUtils;
    }
    @Autowired
    public void setSendService(SendService sendService) {
        RabbitMqReceiver.sendService = sendService;
    }
    @Autowired
    public void setRabbitMqCallPhoneConfig(RabbitMqCallPhoneConfig rabbitMqCallPhoneConfig) {
        RabbitMqReceiver.rabbitMqCallPhoneConfig = rabbitMqCallPhoneConfig;
    }
    @Autowired
@@ -74,76 +102,154 @@
        RabbitMqReceiver.redisCache = redisCache;
    }
//    @RabbitListener(queues = RabbitMqConfig.delay_queue)
//    public void consultReceiveDealy(String content, Message message, Channel channel) throws IOException {
//        log.info("----------------接收延迟队列消息--------------------");
//        //通知 MQ 消息已被接收,可以ACK(从队列中删除)了
//        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//        try {
//            log.info("=============Do Something==============");
//        } catch (Exception e) {
//            log.error("============消费失败,尝试消息补发再次消费!==============");
//            log.error(e.getMessage());
//            /**
//             * basicRecover方法是进行补发操作,
//             * 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
//             * 设置为false是只补发给当前的consumer
//             */
//            channel.basicRecover(false);
//        }
//    }
    /**
     * 电话随访
     * 任务随访
     * concurrency = "50"  并发数为50
     */
    @RabbitListener(queues = RabbitMqConfig.phone_queue, concurrency = "50")
    @RabbitListener(queues = "task_queue", concurrency = "50")
    public void phoneVisit(String content, Message message, Channel channel) throws IOException {
        log.error("消息进来了--------------");
        IvrTaskcallMQ ivrTaskcallMQ = null;
        try {
            IvrTaskcallMQ ivrTaskcallMQ = null;
            ObjectMapper mapper = new ObjectMapper();
            try {
                ivrTaskcallMQ = mapper.readValue("{" + content + "}", IvrTaskcallMQ.class);
                if (!content.startsWith("{")) {
                    ivrTaskcallMQ = mapper.readValue("{" + content + "}", IvrTaskcallMQ.class);
                } else {
                    ivrTaskcallMQ = mapper.readValue(content, IvrTaskcallMQ.class);
                }
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
            //通过模板ID获取模板问题
            IvrLibaTemplateVO ivrLibaTemplateVO = new IvrLibaTemplateVO();
            ivrLibaTemplateVO.setID(Long.valueOf(ivrTaskcallMQ.getTemplateid()));
            IvrLibaTemplateVO ivrLibaTemplateVO1 = ivrLibaTemplateService.selectInfoByCondition(ivrLibaTemplateVO);
            //判断一下ivrTaskcallMQ中的stopstate是否与ivr_task中的一致,不一致,则说明是暂停了
            IvrTask ivrTask1 = ivrTaskMapper.selectIvrTaskByTaskid(ivrTaskcallMQ.getTaskid());
            if (ivrTask1.getStopState() != ivrTaskcallMQ.getStopState()) {
                //将消息从队列中剔除
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                return;
            }
            //通过任务ID拿到患者信息
            IvrTaskSingle ivrTaskcall = new IvrTaskSingle();
            ivrTaskcall.setTaskid(ivrTaskcallMQ.getTaskid());
            List<IvrTaskSingle> ivrTaskcalls = ivrTaskcallMapper.selectIvrTaskcallList(ivrTaskcall);
            if (StringUtils.isNotEmpty(ivrTaskcallMQ.getPreachform())) {
                //如何任务发送方式不为空
                String[] split = ivrTaskcallMQ.getPreachform().split(",");
                for (String serviceFrom : split) {
                    String descByCode = ServiceFromEnum.getDescByCode(Integer.valueOf(serviceFrom));
                    //这里可以考虑用策略模式优化一下,不然太难看了
            for (IvrTaskSingle ivrTaskcall1 : ivrTaskcalls) {
                IvrTask ivrTask = ivrTaskMapper.selectIvrTaskByTaskid(ivrTaskcall1.getTaskid());
                if (StringUtils.isNotEmpty(ivrTask.getSendState()) && ivrTask.getSendState().equals(3) || StringUtils.isNotEmpty(ivrTask.getSendState()) && ivrTask.getSendState().equals(4)) {
                    //如何任务被“暂停”或“终止”
                    break;
                    //通过模板ID获取模板问题
                    IvrTaskTemplateVO ivrTaskTemplateVO = new IvrTaskTemplateVO();
                    ivrTaskTemplateVO.setID(Long.valueOf(ivrTaskcallMQ.getTemplateid()));
                    IvrTaskTemplateVO ivrTaskTemplateVO1 = ivrTaskTemplateService.selectInfoByCondition(ivrTaskTemplateVO);
                    //通过任务ID拿到患者信息
                    IvrTaskSingle ivrTaskcall = new IvrTaskSingle();
                    ivrTaskcall.setTaskid(ivrTaskcallMQ.getTaskid());
                    List<IvrTaskSingle> ivrTaskcalls = ivrTaskcallMapper.selectIvrTaskcallList(ivrTaskcall);
                    if (descByCode.equals("电话")) {
                        for (IvrTaskSingle ivrTaskcall1 : ivrTaskcalls) {
                            IvrTask ivrTask = ivrTaskMapper.selectIvrTaskByTaskid(ivrTaskcall1.getTaskid());
                            if (ivrTask.getSendState() != null && ivrTask.getSendState() == 3 || ivrTask.getSendState() != null && ivrTask.getSendState() == 4) {
                                //如何任务被“暂停”或“终止”
                                break;
                            }
                            //通过多线程的方式去打电话
                            executorService.submit(new PhoneTask(ivrTaskcall1, ivrTaskTemplateVO1, redisCache, rabbitMqCallPhoneConfig, message, sendService, phonePath, robotPhoneUtils));
                        }
                    } else if (descByCode.equals("多媒体")) {
                        //多媒体
                    } else if (descByCode.equals("纸质")) {
                        //纸质
                    } else if (descByCode.equals("短信")) {
                        //短信
//                        http://localhost:8099/followvisit/particty?param1=3&param2=348
                        //对url中两个参数加密
                        RSAPublicKeyExample rsaPublicKeyExample = new RSAPublicKeyExample();
                        String taskId = rsaPublicKeyExample.encryptedData(ivrTask1.getTaskid().toString(), pub_key);
                        for (IvrTaskSingle ivrTaskSingle : ivrTaskcalls) {
                            String patid = rsaPublicKeyExample.encryptedData(ivrTaskSingle.getPatid().toString(), pub_key);
                            SendMagParam sendMagParam = new SendMagParam();
                            sendMagParam.setType("4");
                            sendMagParam.setUrl("192.168.2.10:8099/followvisit/particty?param1=" + taskId + "&param2=" + patid);
                            sendService.sendMsg(sendMagParam);
                        }
                    } else if (descByCode.equals("公众号")) {
                        //公众号
                        RSAPublicKeyExample rsaPublicKeyExample = new RSAPublicKeyExample();
                        String taskId = rsaPublicKeyExample.encryptedData(ivrTask1.getTaskid().toString(), pub_key);
                        for (IvrTaskSingle ivrTaskSingle : ivrTaskcalls) {
                            String patid = rsaPublicKeyExample.encryptedData(ivrTaskSingle.getPatid().toString(), pub_key);
                            SendMagParam sendMagParam = new SendMagParam();
                            sendMagParam.setType("5");
                            sendMagParam.setUrl(req_path + "/outsideChain?param1=" + taskId + "&param2=" + patid + "&param3=" + ivrTask1.getTaskName());
                            log.info("链接完整路径:{}", req_path + "/outsideChain?param1=" + taskId + "&param2=" + patid + "&param3=" + ivrTask1.getTaskName());
                            //这个模板ID先写死,后面做成可选
                            sendMagParam.setTmpCode("oG3pJHPVWpE81DmZsua_2tKwgJ97r0qz37z56ns7NB4");
                            Map map = new HashMap();
                            map.put("first", ivrTask1.getTaskName());
                            sendMagParam.setContent(JSON.toJSONString(map));
                            sendService.sendMsg(sendMagParam);
                        }
                    }
                    //通知 MQ 消息已被接收,可以ACK(从队列中删除)了   (这个需要根据业务再去处理ACK)
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                }
//                //通过多线程的方式去打电话
//                executorService.submit(new PhoneTask(ivrTaskcall1, ivrLibaTemplateVO1, redisCache, channel, message, sendService));
                new PhoneTask2().runPhone(ivrTaskcall1, ivrLibaTemplateVO1, redisCache, channel, message, sendService);
            }
        } catch (Exception e) {
            Integer integer = redisCache.getCacheObject(ivrTaskcallMQ.getTaskid().toString());
            if (integer != null && integer == 2) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } else if (integer == null) {
                redisCache.setCacheObject(ivrTaskcallMQ.getTaskid().toString(), 1, 120, TimeUnit.MINUTES);
            } else {
                redisCache.setCacheObject(ivrTaskcallMQ.getTaskid().toString(), integer + 1, 120, TimeUnit.MINUTES);
            }
            log.error("============消费失败,尝试消息补发再次消费!==============");
            log.error(e.getMessage());
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
    /**
     * 机器人第一句话语音
     * concurrency = "50"  并发数为50
     */
    @RabbitListener(queues = "ob_queue", concurrency = "50")
    public void obVisit(String content, Message message, Channel channel) throws IOException {
        try {
            IvrTaskcallPhoneMQ ivrTaskcallMQ = null;
            ObjectMapper mapper = new ObjectMapper();
            ivrTaskcallMQ = mapper.readValue(content, IvrTaskcallPhoneMQ.class);
            //判断患者是否已经接电话
            String str = redisCache.getCacheObject(ivrTaskcallMQ.getUuid() + "state_id");
            System.out.println("-----------------" + str);
            if (StringUtils.isNotEmpty(str) && str.equals("0")) {
                //患者已经接听了电话
                new PhoneUtils().ttsPlayback(ivrTaskcallMQ.getScript(), ivrTaskcallMQ.getUuid());
            } else if (StringUtils.isNotEmpty(str) && str.equals("-10")) {
                //这个说明,已经打了指定遍数还是没有人接,可以将消息从队列中移除了
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                return;
            } else {
                //进行队列尾部,进行下一次校验
//                channel.basicRecover(false);
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
                System.out.println("++++++++++++++++++++++" + str);
                return;
            }
            //通知 MQ 消息已被接收,可以ACK(从队列中删除)了   (这个需要根据业务再去处理ACK)
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            log.info("=============Do Something==============");
        } catch (Exception e) {
            log.error("============消费失败,尝试消息补发再次消费!==============");
            log.error(e.getMessage());
            /**
             * basicRecover方法是进行补发操作,
             * 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
             * 设置为false是只补发给当前的consumer
             */
            channel.basicRecover(false);
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
    @Bean
    public SimpleRabbitListenerContainerFactory jsonContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();