liusheng
2024-06-12 6d51501ca93f5bf46759f851988650477cf32e6c
ruoyi-admin/src/main/java/com/ruoyi/web/component/RedisMqReceiver.java
ÎļþÃû´Ó ruoyi-admin/src/main/java/com/ruoyi/web/component/RabbitMqReceiver.java ÐÞ¸Ä
@@ -3,32 +3,25 @@
import com.alibaba.fastjson2.JSON;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.rabbitmq.client.Channel;
import com.ruoyi.common.core.redis.RedisCache;
import com.ruoyi.common.enums.ServiceFromEnum;
import com.ruoyi.common.utils.RSAPublicKeyExample;
import com.ruoyi.common.utils.StringUtils;
import com.ruoyi.web.task.PhoneTask;
import com.smartor.common.SendService;
import com.smartor.config.PhoneUtils;
import com.smartor.config.RabbitMqCallPhoneConfig;
import com.smartor.config.RobotPhoneUtils;
import com.smartor.domain.*;
import com.smartor.mapper.IvrTaskMapper;
import com.smartor.mapper.IvrTaskSingleMapper;
import com.smartor.service.IIvrTaskTemplateService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -38,7 +31,7 @@
@Slf4j
@Component//监听此队列
public class RabbitMqReceiver {
public class RedisMqReceiver extends KeyExpirationEventMessageListener {
    @Value("${phonePath}")
    private String phonePath;
@@ -59,56 +52,51 @@
    private static RedisCache redisCache;
    private static RabbitMqCallPhoneConfig rabbitMqCallPhoneConfig;
    private static RobotPhoneUtils robotPhoneUtils;
    // åˆ›å»ºå›ºå®šå¤§å°çš„线程池
    private static final ExecutorService executorService = Executors.newFixedThreadPool(10);
    public RedisMqReceiver(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }
    @Autowired
    public void setIIvrTaskTemplateService(IIvrTaskTemplateService ivrTaskTemplateService) {
        RabbitMqReceiver.ivrTaskTemplateService = ivrTaskTemplateService;
        RedisMqReceiver.ivrTaskTemplateService = ivrTaskTemplateService;
    }
    @Autowired
    public void setRobotPhoneUtilsService(RobotPhoneUtils robotPhoneUtils) {
        RabbitMqReceiver.robotPhoneUtils = robotPhoneUtils;
        RedisMqReceiver.robotPhoneUtils = robotPhoneUtils;
    }
    @Autowired
    public void setSendService(SendService sendService) {
        RabbitMqReceiver.sendService = sendService;
    }
    @Autowired
    public void setRabbitMqCallPhoneConfig(RabbitMqCallPhoneConfig rabbitMqCallPhoneConfig) {
        RabbitMqReceiver.rabbitMqCallPhoneConfig = rabbitMqCallPhoneConfig;
        RedisMqReceiver.sendService = sendService;
    }
    @Autowired
    public void setIvrTaskcallMapper(IvrTaskSingleMapper ivrTaskcallMapper) {
        RabbitMqReceiver.ivrTaskcallMapper = ivrTaskcallMapper;
        RedisMqReceiver.ivrTaskcallMapper = ivrTaskcallMapper;
    }
    @Autowired
    public void setIvrTaskMapper(IvrTaskMapper ivrTaskMapper) {
        RabbitMqReceiver.ivrTaskMapper = ivrTaskMapper;
        RedisMqReceiver.ivrTaskMapper = ivrTaskMapper;
    }
    @Autowired
    public void setRedisCache(RedisCache redisCache) {
        RabbitMqReceiver.redisCache = redisCache;
        RedisMqReceiver.redisCache = redisCache;
    }
    /**
     * ä»»åŠ¡éšè®¿
     * concurrency = "50"  å¹¶å‘数为50
     */
    @RabbitListener(queues = "task_queue", concurrency = "50")
    public void phoneVisit(String content, Message message, Channel channel) throws IOException {
        System.out.println("消息进来了----------------");
    @Override
    public void onMessage(Message message, byte[] pattern) {
        log.info("监听Redis key过期,key:{},channel:{}", message.toString(), new String(pattern));
        String content = message.toString();
        IvrTaskcallMQ ivrTaskcallMQ = null;
        try {
            ObjectMapper mapper = new ObjectMapper();
@@ -125,13 +113,13 @@
            IvrTask ivrTask1 = ivrTaskMapper.selectIvrTaskByTaskid(ivrTaskcallMQ.getTaskid());
            if (ivrTask1.getStopState() != ivrTaskcallMQ.getStopState()) {
                //将消息从队列中剔除
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                return;
            }
            if (StringUtils.isNotEmpty(ivrTaskcallMQ.getPreachform())) {
                //如何任务发送方式不为空
                String[] split = ivrTaskcallMQ.getPreachform().split(",");
                System.out.println("split的值为:" + split);
                for (String serviceFrom : split) {
                    String descByCode = ServiceFromEnum.getDescByCode(Integer.valueOf(serviceFrom));
                    //这里可以考虑用策略模式优化一下,不然太难看了
@@ -153,7 +141,7 @@
                                break;
                            }
                            //通过多线程的方式去打电话
                            executorService.submit(new PhoneTask(ivrTaskcall1, ivrTaskTemplateVO1, redisCache, rabbitMqCallPhoneConfig, message, sendService, phonePath, robotPhoneUtils));
                            executorService.submit(new PhoneTask(ivrTaskcall1, ivrTaskTemplateVO1, redisCache, sendService, phonePath, robotPhoneUtils));
                        }
                    } else if (descByCode.equals("多媒体")) {
                        //多媒体
@@ -193,13 +181,12 @@
                        }
                    }
                    //通知 MQ æ¶ˆæ¯å·²è¢«æŽ¥æ”¶,可以ACK(从队列中删除)了   ï¼ˆè¿™ä¸ªéœ€è¦æ ¹æ®ä¸šåŠ¡å†åŽ»å¤„ç†ACK)
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                }
            }
        } catch (Exception e) {
            Integer integer = redisCache.getCacheObject(ivrTaskcallMQ.getTaskid().toString());
            if (integer != null && integer == 2) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                //将消息从队列中删除
            } else if (integer == null) {
                redisCache.setCacheObject(ivrTaskcallMQ.getTaskid().toString(), 1, 120, TimeUnit.MINUTES);
            } else {
@@ -208,53 +195,45 @@
            log.error("============消费失败,尝试消息补发再次消费!==============");
            log.error(e.getMessage());
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
            redisCache.setCacheObject(message.toString(), message.toString(), 60, TimeUnit.SECONDS);
        }
    }
    /**
     * æœºå™¨äººç¬¬ä¸€å¥è¯è¯­éŸ³
     * concurrency = "50"  å¹¶å‘数为50
     */
    @RabbitListener(queues = "ob_queue", concurrency = "50")
    public void obVisit(String content, Message message, Channel channel) throws IOException {
        try {
            IvrTaskcallPhoneMQ ivrTaskcallMQ = null;
            ObjectMapper mapper = new ObjectMapper();
            ivrTaskcallMQ = mapper.readValue(content, IvrTaskcallPhoneMQ.class);
            //判断患者是否已经接电话
            String str = redisCache.getCacheObject(ivrTaskcallMQ.getUuid() + "state_id");
            System.out.println("-----------------" + str);
            if (StringUtils.isNotEmpty(str) && str.equals("0")) {
                //患者已经接听了电话
                new PhoneUtils().ttsPlayback(ivrTaskcallMQ.getScript(), ivrTaskcallMQ.getUuid());
            } else if (StringUtils.isNotEmpty(str) && str.equals("-10")) {
                //这个说明,已经打了指定遍数还是没有人接,可以将消息从队列中移除了
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                return;
            } else {
                //进行队列尾部,进行下一次校验
//                channel.basicRecover(false);
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
                System.out.println("++++++++++++++++++++++" + str);
                return;
            }
            //通知 MQ æ¶ˆæ¯å·²è¢«æŽ¥æ”¶,可以ACK(从队列中删除)了   ï¼ˆè¿™ä¸ªéœ€è¦æ ¹æ®ä¸šåŠ¡å†åŽ»å¤„ç†ACK)
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.error("============消费失败,尝试消息补发再次消费!==============");
            log.error(e.getMessage());
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }
    }
//    /**
//     * æœºå™¨äººç¬¬ä¸€å¥è¯è¯­éŸ³
//     * concurrency = "50"  å¹¶å‘数为50
//     */
//    @RabbitListener(queues = "ob_queue", concurrency = "50")
//    public void obVisit(String content, Message message, Channel channel) throws IOException {
//        try {
//            IvrTaskcallPhoneMQ ivrTaskcallMQ = null;
//            ObjectMapper mapper = new ObjectMapper();
//            ivrTaskcallMQ = mapper.readValue(content, IvrTaskcallPhoneMQ.class);
//            //判断患者是否已经接电话
//            String str = redisCache.getCacheObject(ivrTaskcallMQ.getUuid() + "state_id");
//            System.out.println("-----------------" + str);
//            if (StringUtils.isNotEmpty(str) && str.equals("0")) {
//                //患者已经接听了电话
//                new PhoneUtils().ttsPlayback(ivrTaskcallMQ.getScript(), ivrTaskcallMQ.getUuid());
//            } else if (StringUtils.isNotEmpty(str) && str.equals("-10")) {
//                //这个说明,已经打了指定遍数还是没有人接,可以将消息从队列中移除了
//                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//                return;
//            } else {
//                //进行队列尾部,进行下一次校验
////                channel.basicRecover(false);
//                channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
//                System.out.println("++++++++++++++++++++++" + str);
//                return;
//            }
//            //通知 MQ æ¶ˆæ¯å·²è¢«æŽ¥æ”¶,可以ACK(从队列中删除)了   ï¼ˆè¿™ä¸ªéœ€è¦æ ¹æ®ä¸šåŠ¡å†åŽ»å¤„ç†ACK)
//            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//        } catch (Exception e) {
//            log.error("============消费失败,尝试消息补发再次消费!==============");
//            log.error(e.getMessage());
//            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
//        }
//    }
    @Bean
    public SimpleRabbitListenerContainerFactory jsonContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }
}