| | |
| | | package com.ruoyi.web.component; |
| | | |
| | | import com.alibaba.fastjson2.JSON; |
| | | import com.fasterxml.jackson.core.JsonProcessingException; |
| | | import com.fasterxml.jackson.databind.ObjectMapper; |
| | | import com.rabbitmq.client.Channel; |
| | | import com.ruoyi.common.core.redis.RedisCache; |
| | | import com.ruoyi.common.enums.ServiceFromEnum; |
| | | import com.ruoyi.common.utils.RSAPublicKeyExample; |
| | | import com.ruoyi.common.utils.StringUtils; |
| | | import com.ruoyi.web.task.PhoneTask; |
| | | import com.smartor.config.RabbitMqConfig; |
| | | import com.smartor.domain.IvrLibaTemplateVO; |
| | | import com.smartor.domain.IvrTaskSingle; |
| | | import com.smartor.domain.IvrTaskcallMQ; |
| | | import com.smartor.common.SendService; |
| | | import com.smartor.config.PhoneUtils; |
| | | import com.smartor.config.RabbitMqCallPhoneConfig; |
| | | import com.smartor.config.RobotPhoneUtils; |
| | | import com.smartor.domain.*; |
| | | import com.smartor.mapper.IvrTaskMapper; |
| | | import com.smartor.mapper.IvrTaskSingleMapper; |
| | | import com.smartor.service.IIvrLibaTemplateService; |
| | | import com.smartor.service.IIvrTaskTemplateService; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.amqp.core.Message; |
| | | import org.springframework.amqp.rabbit.annotation.RabbitListener; |
| | | import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; |
| | | import org.springframework.amqp.rabbit.connection.ConnectionFactory; |
| | | import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.beans.factory.annotation.Value; |
| | | import org.springframework.context.annotation.Bean; |
| | | import org.springframework.stereotype.Component; |
| | | import com.rabbitmq.client.Channel; |
| | | import org.springframework.amqp.rabbit.connection.ConnectionFactory; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.ExecutorService; |
| | | import java.util.concurrent.Executors; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | @Slf4j |
| | | @Component//监听此队列 |
| | | public class RabbitMqReceiver { |
| | | |
| | | private static IIvrLibaTemplateService ivrLibaTemplateService; |
| | | @Value("${phonePath}") |
| | | private String phonePath; |
| | | |
| | | @Value("${pub_key}") |
| | | private String pub_key; |
| | | |
| | | @Value("${req_path}") |
| | | private String req_path; |
| | | |
| | | private static IIvrTaskTemplateService ivrTaskTemplateService; |
| | | |
| | | private static IvrTaskSingleMapper ivrTaskcallMapper; |
| | | |
| | | private static IvrTaskMapper ivrTaskMapper; |
| | | |
| | | private static SendService sendService; |
| | | |
| | | private static RedisCache redisCache; |
| | | |
| | | private static RabbitMqCallPhoneConfig rabbitMqCallPhoneConfig; |
| | | |
| | | private static RobotPhoneUtils robotPhoneUtils; |
| | | |
| | | |
| | | // 创建固定大小的线程池 |
| | | private static final ExecutorService executorService = Executors.newFixedThreadPool(10); |
| | | |
| | | @Autowired |
| | | public void setIIvrTaskTemplateService(IIvrTaskTemplateService ivrTaskTemplateService) { |
| | | RabbitMqReceiver.ivrTaskTemplateService = ivrTaskTemplateService; |
| | | } |
| | | |
| | | @Autowired |
| | | public void setIIvrLibaTemplateService(IIvrLibaTemplateService ivrLibaTemplateService) { |
| | | RabbitMqReceiver.ivrLibaTemplateService = ivrLibaTemplateService; |
| | | public void setRobotPhoneUtilsService(RobotPhoneUtils robotPhoneUtils) { |
| | | RabbitMqReceiver.robotPhoneUtils = robotPhoneUtils; |
| | | } |
| | | |
| | | @Autowired |
| | | public void setSendService(SendService sendService) { |
| | | RabbitMqReceiver.sendService = sendService; |
| | | } |
| | | |
| | | @Autowired |
| | | public void setRabbitMqCallPhoneConfig(RabbitMqCallPhoneConfig rabbitMqCallPhoneConfig) { |
| | | RabbitMqReceiver.rabbitMqCallPhoneConfig = rabbitMqCallPhoneConfig; |
| | | } |
| | | |
| | | @Autowired |
| | |
| | | } |
| | | |
| | | @Autowired |
| | | public void setIvrTaskMapper(IvrTaskMapper ivrTaskMapper) { |
| | | RabbitMqReceiver.ivrTaskMapper = ivrTaskMapper; |
| | | } |
| | | |
| | | @Autowired |
| | | public void setRedisCache(RedisCache redisCache) { |
| | | RabbitMqReceiver.redisCache = redisCache; |
| | | } |
| | | |
| | | // @RabbitListener(queues = RabbitMqConfig.delay_queue) |
| | | // public void consultReceiveDealy(String content, Message message, Channel channel) throws IOException { |
| | | // log.info("----------------接收延迟队列消息--------------------"); |
| | | // //通知 MQ 消息已被接收,可以ACK(从队列中删除)了 |
| | | // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); |
| | | // try { |
| | | // log.info("=============Do Something=============="); |
| | | // } catch (Exception e) { |
| | | // log.error("============消费失败,尝试消息补发再次消费!=============="); |
| | | // log.error(e.getMessage()); |
| | | // /** |
| | | // * basicRecover方法是进行补发操作, |
| | | // * 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到, |
| | | // * 设置为false是只补发给当前的consumer |
| | | // */ |
| | | // channel.basicRecover(false); |
| | | // } |
| | | // } |
| | | |
| | | /** |
| | | * 电话随访 |
| | | * 任务随访 |
| | | * concurrency = "50" 并发数为50 |
| | | */ |
| | | @RabbitListener(queues = RabbitMqConfig.phone_queue, concurrency = "50") |
| | | @RabbitListener(queues = "task_queue", concurrency = "50") |
| | | public void phoneVisit(String content, Message message, Channel channel) throws IOException { |
| | | System.out.println("消息进来了----------------"); |
| | | IvrTaskcallMQ ivrTaskcallMQ = null; |
| | | try { |
| | | IvrTaskcallMQ ivrTaskcallMQ = null; |
| | | |
| | | ObjectMapper mapper = new ObjectMapper(); |
| | | try { |
| | | ivrTaskcallMQ = mapper.readValue("{" + content + "}", IvrTaskcallMQ.class); |
| | | if (!content.startsWith("{")) { |
| | | ivrTaskcallMQ = mapper.readValue("{" + content + "}", IvrTaskcallMQ.class); |
| | | } else { |
| | | ivrTaskcallMQ = mapper.readValue(content, IvrTaskcallMQ.class); |
| | | } |
| | | } catch (JsonProcessingException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | //通过模板ID获取模板问题 |
| | | IvrLibaTemplateVO ivrLibaTemplateVO = new IvrLibaTemplateVO(); |
| | | ivrLibaTemplateVO.setID(Long.valueOf(ivrTaskcallMQ.getTemplateid())); |
| | | IvrLibaTemplateVO ivrLibaTemplateVO1 = ivrLibaTemplateService.selectInfoByCondition(ivrLibaTemplateVO); |
| | | //判断一下ivrTaskcallMQ中的stopstate是否与ivr_task中的一致,不一致,则说明是暂停了 |
| | | IvrTask ivrTask1 = ivrTaskMapper.selectIvrTaskByTaskid(ivrTaskcallMQ.getTaskid()); |
| | | if (ivrTask1.getStopState() != ivrTaskcallMQ.getStopState()) { |
| | | //将消息从队列中剔除 |
| | | channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); |
| | | return; |
| | | } |
| | | |
| | | //通过任务ID拿到患者信息 |
| | | IvrTaskSingle ivrTaskcall = new IvrTaskSingle(); |
| | | ivrTaskcall.setTaskid(ivrTaskcallMQ.getTaskid()); |
| | | List<IvrTaskSingle> ivrTaskcalls = ivrTaskcallMapper.selectIvrTaskcallList(ivrTaskcall); |
| | | if (StringUtils.isNotEmpty(ivrTaskcallMQ.getPreachform())) { |
| | | //如何任务发送方式不为空 |
| | | String[] split = ivrTaskcallMQ.getPreachform().split(","); |
| | | for (String serviceFrom : split) { |
| | | String descByCode = ServiceFromEnum.getDescByCode(Integer.valueOf(serviceFrom)); |
| | | //这里可以考虑用策略模式优化一下,不然太难看了 |
| | | |
| | | for (IvrTaskSingle ivrTaskcall1 : ivrTaskcalls) { |
| | | //通过多线程的方式去打电话 |
| | | executorService.submit(new PhoneTask(ivrTaskcall1, ivrLibaTemplateVO1, redisCache, channel, message)); |
| | | //通过模板ID获取模板问题 |
| | | IvrTaskTemplateVO ivrTaskTemplateVO = new IvrTaskTemplateVO(); |
| | | ivrTaskTemplateVO.setID(Long.valueOf(ivrTaskcallMQ.getTemplateid())); |
| | | IvrTaskTemplateVO ivrTaskTemplateVO1 = ivrTaskTemplateService.selectInfoByCondition(ivrTaskTemplateVO); |
| | | |
| | | //通过任务ID拿到患者信息 |
| | | IvrTaskSingle ivrTaskcall = new IvrTaskSingle(); |
| | | ivrTaskcall.setTaskid(ivrTaskcallMQ.getTaskid()); |
| | | List<IvrTaskSingle> ivrTaskcalls = ivrTaskcallMapper.selectIvrTaskcallList(ivrTaskcall); |
| | | if (descByCode.equals("电话")) { |
| | | for (IvrTaskSingle ivrTaskcall1 : ivrTaskcalls) { |
| | | IvrTask ivrTask = ivrTaskMapper.selectIvrTaskByTaskid(ivrTaskcall1.getTaskid()); |
| | | if (ivrTask.getSendState() != null && ivrTask.getSendState() == 3 || ivrTask.getSendState() != null && ivrTask.getSendState() == 4) { |
| | | //如何任务被“暂停”或“终止” |
| | | break; |
| | | } |
| | | //通过多线程的方式去打电话 |
| | | executorService.submit(new PhoneTask(ivrTaskcall1, ivrTaskTemplateVO1, redisCache, rabbitMqCallPhoneConfig, message, sendService, phonePath, robotPhoneUtils)); |
| | | } |
| | | } else if (descByCode.equals("多媒体")) { |
| | | //多媒体 |
| | | } else if (descByCode.equals("纸质")) { |
| | | //纸质 |
| | | } else if (descByCode.equals("短信")) { |
| | | //短信 |
| | | // http://localhost:8099/followvisit/particty?param1=3¶m2=348 |
| | | //对url中两个参数加密 |
| | | RSAPublicKeyExample rsaPublicKeyExample = new RSAPublicKeyExample(); |
| | | String taskId = rsaPublicKeyExample.encryptedData(ivrTask1.getTaskid().toString(), pub_key); |
| | | |
| | | |
| | | for (IvrTaskSingle ivrTaskSingle : ivrTaskcalls) { |
| | | String patid = rsaPublicKeyExample.encryptedData(ivrTaskSingle.getPatid().toString(), pub_key); |
| | | SendMagParam sendMagParam = new SendMagParam(); |
| | | sendMagParam.setType("4"); |
| | | sendMagParam.setUrl(req_path + "/followvisit/particty?param1=" + taskId + "¶m2=" + patid); |
| | | sendService.sendMsg(sendMagParam); |
| | | } |
| | | } else if (descByCode.equals("公众号")) { |
| | | //公众号 |
| | | RSAPublicKeyExample rsaPublicKeyExample = new RSAPublicKeyExample(); |
| | | String taskId = rsaPublicKeyExample.encryptedData(ivrTask1.getTaskid().toString(), pub_key); |
| | | for (IvrTaskSingle ivrTaskSingle : ivrTaskcalls) { |
| | | String patid = rsaPublicKeyExample.encryptedData(ivrTaskSingle.getPatid().toString(), pub_key); |
| | | SendMagParam sendMagParam = new SendMagParam(); |
| | | sendMagParam.setType("5"); |
| | | sendMagParam.setUrl(req_path + "/outsideChain?param1=" + taskId + "¶m2=" + patid + "¶m3=" + ivrTask1.getTaskName()); |
| | | log.info("链接完整路径:{}", req_path + "/outsideChain?param1=" + taskId + "¶m2=" + patid + "¶m3=" + ivrTask1.getTaskName()); |
| | | //这个模板ID先写死,后面做成可选 |
| | | sendMagParam.setTmpCode("oG3pJHPVWpE81DmZsua_2tKwgJ97r0qz37z56ns7NB4"); |
| | | Map map = new HashMap(); |
| | | map.put("first", ivrTask1.getTaskName()); |
| | | sendMagParam.setContent(JSON.toJSONString(map)); |
| | | sendService.sendMsg(sendMagParam); |
| | | } |
| | | } |
| | | //通知 MQ 消息已被接收,可以ACK(从队列中删除)了 (这个需要根据业务再去处理ACK) |
| | | channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | Integer integer = redisCache.getCacheObject(ivrTaskcallMQ.getTaskid().toString()); |
| | | if (integer != null && integer == 2) { |
| | | channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); |
| | | } else if (integer == null) { |
| | | redisCache.setCacheObject(ivrTaskcallMQ.getTaskid().toString(), 1, 120, TimeUnit.MINUTES); |
| | | } else { |
| | | redisCache.setCacheObject(ivrTaskcallMQ.getTaskid().toString(), integer + 1, 120, TimeUnit.MINUTES); |
| | | } |
| | | |
| | | log.error("============消费失败,尝试消息补发再次消费!=============="); |
| | | log.error(e.getMessage()); |
| | | channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | | * 机器人第一句话语音 |
| | | * concurrency = "50" 并发数为50 |
| | | */ |
| | | @RabbitListener(queues = "ob_queue", concurrency = "50") |
| | | public void obVisit(String content, Message message, Channel channel) throws IOException { |
| | | try { |
| | | IvrTaskcallPhoneMQ ivrTaskcallMQ = null; |
| | | ObjectMapper mapper = new ObjectMapper(); |
| | | ivrTaskcallMQ = mapper.readValue(content, IvrTaskcallPhoneMQ.class); |
| | | //判断患者是否已经接电话 |
| | | String str = redisCache.getCacheObject(ivrTaskcallMQ.getUuid() + "state_id"); |
| | | System.out.println("-----------------" + str); |
| | | if (StringUtils.isNotEmpty(str) && str.equals("0")) { |
| | | //患者已经接听了电话 |
| | | new PhoneUtils().ttsPlayback(ivrTaskcallMQ.getScript(), ivrTaskcallMQ.getUuid()); |
| | | } else if (StringUtils.isNotEmpty(str) && str.equals("-10")) { |
| | | //这个说明,已经打了指定遍数还是没有人接,可以将消息从队列中移除了 |
| | | channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); |
| | | return; |
| | | } else { |
| | | //进行队列尾部,进行下一次校验 |
| | | // channel.basicRecover(false); |
| | | channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); |
| | | System.out.println("++++++++++++++++++++++" + str); |
| | | return; |
| | | } |
| | | //通知 MQ 消息已被接收,可以ACK(从队列中删除)了 (这个需要根据业务再去处理ACK) |
| | | channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); |
| | | |
| | | log.info("=============Do Something=============="); |
| | | } catch (Exception e) { |
| | | log.error("============消费失败,尝试消息补发再次消费!=============="); |
| | | log.error(e.getMessage()); |
| | | /** |
| | | * basicRecover方法是进行补发操作, |
| | | * 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到, |
| | | * 设置为false是只补发给当前的consumer |
| | | */ |
| | | channel.basicRecover(false); |
| | | channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); |
| | | } |
| | | } |
| | | |
| | | |
| | | @Bean |
| | | public SimpleRabbitListenerContainerFactory jsonContainerFactory(ConnectionFactory connectionFactory) { |
| | | SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); |