package com.ruoyi.web.component; import com.alibaba.fastjson2.JSON; import com.fasterxml.jackson.databind.ObjectMapper; import com.ruoyi.common.core.redis.RedisCache; import com.ruoyi.common.enums.ServiceFromEnum; import com.ruoyi.common.utils.RSAPublicKeyExample; import com.ruoyi.common.utils.StringUtils; import com.ruoyi.web.task.PhoneTask; import com.smartor.common.SendService; import com.smartor.config.RobotPhoneUtils; import com.smartor.domain.*; import com.smartor.mapper.ServiceSubtaskMapper; import com.smartor.mapper.ServiceTaskMapper; import com.smartor.mapper.SvyTaskMapper; import com.smartor.mapper.SvyTaskSingleMapper; import com.smartor.service.IIvrTaskTemplateService; import com.smartor.service.IServiceSubtaskRecordService; import com.smartor.service.ISvyTaskTemplateService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.listener.KeyExpirationEventMessageListener; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.stereotype.Component; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @Slf4j @Component//监听此队列 public class RedisMqReceiver extends KeyExpirationEventMessageListener { @Value("${phonePath}") private String phonePath; @Value("${pub_key}") private String pub_key; @Value("${req_path}") private String req_path; private static IIvrTaskTemplateService ivrTaskTemplateService; private static ServiceSubtaskMapper ivrTaskcallMapper; private static ServiceTaskMapper ivrTaskMapper; private static SendService sendService; private static RedisCache redisCache; private static RobotPhoneUtils robotPhoneUtils; @Autowired private SvyTaskMapper svyTaskMapper; @Autowired private SvyTaskSingleMapper svyTaskSingleMapper; @Autowired private ISvyTaskTemplateService iSvyTaskTemplateService; @Autowired private IServiceSubtaskRecordService serviceSubtaskRecordService; // 创建固定大小的线程池 private static final ExecutorService executorService = Executors.newFixedThreadPool(10); public RedisMqReceiver(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } @Autowired public void setIIvrTaskTemplateService(IIvrTaskTemplateService ivrTaskTemplateService) { RedisMqReceiver.ivrTaskTemplateService = ivrTaskTemplateService; } @Autowired public void setRobotPhoneUtilsService(RobotPhoneUtils robotPhoneUtils) { RedisMqReceiver.robotPhoneUtils = robotPhoneUtils; } @Autowired public void setSendService(SendService sendService) { RedisMqReceiver.sendService = sendService; } @Autowired public void setIvrTaskcallMapper(ServiceSubtaskMapper ivrTaskcallMapper) { RedisMqReceiver.ivrTaskcallMapper = ivrTaskcallMapper; } @Autowired public void setIvrTaskMapper(ServiceTaskMapper ivrTaskMapper) { RedisMqReceiver.ivrTaskMapper = ivrTaskMapper; } @Autowired public void setRedisCache(RedisCache redisCache) { RedisMqReceiver.redisCache = redisCache; } @Override public void onMessage(Message message, byte[] pattern) { log.info("监听Redis key过期,key:{},channel:{}", message.toString(), new String(pattern)); String content = message.toString(); //判断是不是任务信息,如果不是,直接返回,不需要执行 if (!content.contains("taskid")) { log.info("不是任务信息"); return; } CommonTaskcallMQ commonTaskcallMQ = null; try { ObjectMapper mapper = new ObjectMapper(); if (!content.startsWith("{")) { commonTaskcallMQ = mapper.readValue("{" + content + "}", CommonTaskcallMQ.class); } else { commonTaskcallMQ = mapper.readValue(content, CommonTaskcallMQ.class); } if (commonTaskcallMQ.getTaskType() == 1) { //随访 sfHandle((commonTaskcallMQ)); } else if (commonTaskcallMQ.getTaskType() == 2) { //问卷 wjHandle(commonTaskcallMQ); } else if (commonTaskcallMQ.getTaskType() == 3) { //宣教 } } catch (Exception e) { Integer integer = redisCache.getCacheObject(commonTaskcallMQ.getTaskid().toString()); if (integer != null && integer == 2) { //将消息从队列中删除 } else if (integer == null) { redisCache.setCacheObject(commonTaskcallMQ.getTaskid().toString(), 1, 120, TimeUnit.MINUTES); } else { redisCache.setCacheObject(commonTaskcallMQ.getTaskid().toString(), integer + 1, 120, TimeUnit.MINUTES); } log.error("============消费失败,尝试消息补发再次消费!============== {}", e.getMessage()); redisCache.setCacheObject(message.toString(), message.toString(), 60, TimeUnit.SECONDS); } } /** * 随访任务处理 * * @param commonTaskcallMQ */ private void sfHandle(CommonTaskcallMQ commonTaskcallMQ) { //判断一下commonTaskcallMQ中的stopstate是否与ivr_task中的一致,不一致,则说明是暂停了 ServiceTask ivrTask1 = ivrTaskMapper.selectServiceTaskByTaskid(commonTaskcallMQ.getTaskid()); if (ivrTask1.getStopState() != commonTaskcallMQ.getStopState()) { //将消息从队列中剔除 return; } if (StringUtils.isNotEmpty(commonTaskcallMQ.getPreachform())) { //如何任务发送方式不为空 String[] split = commonTaskcallMQ.getPreachform().split(","); System.out.println("split的值为:" + split); for (String serviceFrom : split) { String descByCode = ServiceFromEnum.getDescByCode(Integer.valueOf(serviceFrom)); //这里可以考虑用策略模式优化一下,不然太难看了 //通过模板ID获取模板问题 IvrTaskTemplateVO ivrTaskTemplateVO = new IvrTaskTemplateVO(); ivrTaskTemplateVO.setID(Long.valueOf(commonTaskcallMQ.getTemplateid())); IvrTaskTemplateVO ivrTaskTemplateVO1 = ivrTaskTemplateService.selectInfoByCondition(ivrTaskTemplateVO); //通过任务ID拿到患者信息 ServiceSubtask ivrTaskcall = new ServiceSubtask(); ivrTaskcall.setTaskid(commonTaskcallMQ.getTaskid()); List ivrTaskcalls = ivrTaskcallMapper.selectServiceSubtaskList(ivrTaskcall); if (descByCode.equals("电话")) { for (ServiceSubtask ivrTaskcall1 : ivrTaskcalls) { ServiceTask ivrTask = ivrTaskMapper.selectServiceTaskByTaskid(ivrTaskcall1.getTaskid()); if (ivrTask.getSendState() != null && ivrTask.getSendState() == 3 || ivrTask.getSendState() != null && ivrTask.getSendState() == 4) { //如何任务被“暂停”或“终止” break; } //任务发送记录(这个代码应该写在phoneTask中,先写在这,后面再改) ServiceSubtaskRecord serviceSubtaskRecord = new ServiceSubtaskRecord(); serviceSubtaskRecord.setTaskid(ivrTaskcall1.getTaskid().toString()); serviceSubtaskRecord.setUuid(UUID.randomUUID().toString()); serviceSubtaskRecord.setTasktype(ivrTaskcall1.getType()); serviceSubtaskRecord.setPreachform("3"); serviceSubtaskRecord.setStartTime(System.currentTimeMillis()); serviceSubtaskRecordService.insertServiceSubtaskRecord(serviceSubtaskRecord); //通过多线程的方式去打电话 executorService.submit(new PhoneTask(ivrTaskcall1, ivrTaskTemplateVO1, redisCache, sendService, phonePath, robotPhoneUtils)); } } else if (descByCode.equals("多媒体")) { //多媒体 } else if (descByCode.equals("纸质")) { //纸质 } else if (descByCode.equals("短信")) { //短信 // http://localhost:8099/followvisit/particty?param1=3¶m2=348 //对url中两个参数加密 RSAPublicKeyExample rsaPublicKeyExample = new RSAPublicKeyExample(); String taskId = rsaPublicKeyExample.encryptedData(ivrTask1.getTaskid().toString(), pub_key); for (ServiceSubtask ivrTaskSingle : ivrTaskcalls) { String patid = rsaPublicKeyExample.encryptedData(ivrTaskSingle.getPatid().toString(), pub_key); SendMagParam sendMagParam = new SendMagParam(); sendMagParam.setType("4"); sendMagParam.setUrl(req_path + "/followvisit/particty?param1=" + taskId + "¶m2=" + patid); Boolean aBoolean = sendService.sendMsg(sendMagParam); //任务发送记录 ServiceSubtaskRecord serviceSubtaskRecord = new ServiceSubtaskRecord(); serviceSubtaskRecord.setTaskid(ivrTaskSingle.getTaskid().toString()); serviceSubtaskRecord.setUuid(UUID.randomUUID().toString()); serviceSubtaskRecord.setTasktype(ivrTaskSingle.getType()); serviceSubtaskRecord.setPreachform("4"); serviceSubtaskRecord.setStartTime(System.currentTimeMillis()); serviceSubtaskRecord.setResult(aBoolean == true ? "成功" : "失败"); serviceSubtaskRecordService.insertServiceSubtaskRecord(serviceSubtaskRecord); } } else if (descByCode.equals("公众号")) { //公众号 RSAPublicKeyExample rsaPublicKeyExample = new RSAPublicKeyExample(); String taskId = rsaPublicKeyExample.encryptedData(ivrTask1.getTaskid().toString(), pub_key); for (ServiceSubtask ivrTaskSingle : ivrTaskcalls) { String patid = rsaPublicKeyExample.encryptedData(ivrTaskSingle.getPatid().toString(), pub_key); SendMagParam sendMagParam = new SendMagParam(); sendMagParam.setType("5"); sendMagParam.setUrl(req_path + "/outsideChain?param1=" + taskId + "¶m2=" + patid + "¶m3=" + ivrTask1.getTaskName()); log.info("链接完整路径:{}", req_path + "/outsideChain?param1=" + taskId + "¶m2=" + patid + "¶m3=" + ivrTask1.getTaskName()); //这个模板ID先写死,后面做成可选 sendMagParam.setTmpCode("oG3pJHPVWpE81DmZsua_2tKwgJ97r0qz37z56ns7NB4"); Map map = new HashMap(); map.put("first", ivrTask1.getTaskName()); sendMagParam.setContent(JSON.toJSONString(map)); Boolean aBoolean = sendService.sendMsg(sendMagParam); //任务发送记录 ServiceSubtaskRecord serviceSubtaskRecord = new ServiceSubtaskRecord(); serviceSubtaskRecord.setTaskid(ivrTaskSingle.getTaskid().toString()); serviceSubtaskRecord.setUuid(UUID.randomUUID().toString()); serviceSubtaskRecord.setTasktype(ivrTaskSingle.getType()); serviceSubtaskRecord.setPreachform("4"); serviceSubtaskRecord.setStartTime(System.currentTimeMillis()); serviceSubtaskRecord.setResult(aBoolean == true ? "成功" : "失败"); serviceSubtaskRecordService.insertServiceSubtaskRecord(serviceSubtaskRecord); } } //通知 MQ 消息已被接收,可以ACK(从队列中删除)了 (这个需要根据业务再去处理ACK) } } } /** * 宣教任务处理 * * @param commonTaskcallMQ */ private void xjHandle(CommonTaskcallMQ commonTaskcallMQ) { //判断一下commonTaskcallMQ中的stopstate是否与ivr_task中的一致,不一致,则说明是暂停了 ServiceTask ivrTask1 = ivrTaskMapper.selectServiceTaskByTaskid(commonTaskcallMQ.getTaskid()); if (ivrTask1.getStopState() != commonTaskcallMQ.getStopState()) { //将消息从队列中剔除 return; } if (StringUtils.isNotEmpty(commonTaskcallMQ.getPreachform())) { //如何任务发送方式不为空 String[] split = commonTaskcallMQ.getPreachform().split(","); System.out.println("split的值为:" + split); for (String serviceFrom : split) { String descByCode = ServiceFromEnum.getDescByCode(Integer.valueOf(serviceFrom)); //这里可以考虑用策略模式优化一下,不然太难看了 //通过模板ID获取模板问题 IvrTaskTemplateVO ivrTaskTemplateVO = new IvrTaskTemplateVO(); ivrTaskTemplateVO.setID(Long.valueOf(commonTaskcallMQ.getTemplateid())); IvrTaskTemplateVO ivrTaskTemplateVO1 = ivrTaskTemplateService.selectInfoByCondition(ivrTaskTemplateVO); //通过任务ID拿到患者信息 ServiceSubtask ivrTaskcall = new ServiceSubtask(); ivrTaskcall.setTaskid(commonTaskcallMQ.getTaskid()); List ivrTaskcalls = ivrTaskcallMapper.selectServiceSubtaskList(ivrTaskcall); if (descByCode.equals("电话")) { for (ServiceSubtask ivrTaskcall1 : ivrTaskcalls) { ServiceTask ivrTask = ivrTaskMapper.selectServiceTaskByTaskid(ivrTaskcall1.getTaskid()); if (ivrTask.getSendState() != null && ivrTask.getSendState() == 3 || ivrTask.getSendState() != null && ivrTask.getSendState() == 4) { //如何任务被“暂停”或“终止” break; } //任务发送记录(这个代码应该写在phoneTask中,先写在这,后面再改) ServiceSubtaskRecord serviceSubtaskRecord = new ServiceSubtaskRecord(); serviceSubtaskRecord.setTaskid(ivrTaskcall1.getTaskid().toString()); serviceSubtaskRecord.setUuid(UUID.randomUUID().toString()); serviceSubtaskRecord.setTasktype(ivrTaskcall1.getType()); serviceSubtaskRecord.setPreachform("3"); serviceSubtaskRecord.setStartTime(System.currentTimeMillis()); serviceSubtaskRecordService.insertServiceSubtaskRecord(serviceSubtaskRecord); //通过多线程的方式去打电话 executorService.submit(new PhoneTask(ivrTaskcall1, ivrTaskTemplateVO1, redisCache, sendService, phonePath, robotPhoneUtils)); } } else if (descByCode.equals("多媒体")) { //多媒体 } else if (descByCode.equals("纸质")) { //纸质 } else if (descByCode.equals("短信")) { //短信 // http://localhost:8099/followvisit/particty?param1=3¶m2=348 //对url中两个参数加密 RSAPublicKeyExample rsaPublicKeyExample = new RSAPublicKeyExample(); String taskId = rsaPublicKeyExample.encryptedData(ivrTask1.getTaskid().toString(), pub_key); for (ServiceSubtask ivrTaskSingle : ivrTaskcalls) { String patid = rsaPublicKeyExample.encryptedData(ivrTaskSingle.getPatid().toString(), pub_key); SendMagParam sendMagParam = new SendMagParam(); sendMagParam.setType("4"); sendMagParam.setUrl(req_path + "/followvisit/particty?param1=" + taskId + "¶m2=" + patid); Boolean aBoolean = sendService.sendMsg(sendMagParam); //任务发送记录 ServiceSubtaskRecord serviceSubtaskRecord = new ServiceSubtaskRecord(); serviceSubtaskRecord.setTaskid(ivrTaskSingle.getTaskid().toString()); serviceSubtaskRecord.setUuid(UUID.randomUUID().toString()); serviceSubtaskRecord.setTasktype(ivrTaskSingle.getType()); serviceSubtaskRecord.setPreachform("4"); serviceSubtaskRecord.setStartTime(System.currentTimeMillis()); serviceSubtaskRecord.setResult(aBoolean == true ? "成功" : "失败"); serviceSubtaskRecordService.insertServiceSubtaskRecord(serviceSubtaskRecord); } } else if (descByCode.equals("公众号")) { //公众号 RSAPublicKeyExample rsaPublicKeyExample = new RSAPublicKeyExample(); String taskId = rsaPublicKeyExample.encryptedData(ivrTask1.getTaskid().toString(), pub_key); for (ServiceSubtask ivrTaskSingle : ivrTaskcalls) { String patid = rsaPublicKeyExample.encryptedData(ivrTaskSingle.getPatid().toString(), pub_key); SendMagParam sendMagParam = new SendMagParam(); sendMagParam.setType("5"); sendMagParam.setUrl(req_path + "/outsideChain?param1=" + taskId + "¶m2=" + patid + "¶m3=" + ivrTask1.getTaskName()); log.info("链接完整路径:{}", req_path + "/outsideChain?param1=" + taskId + "¶m2=" + patid + "¶m3=" + ivrTask1.getTaskName()); //这个模板ID先写死,后面做成可选 sendMagParam.setTmpCode("oG3pJHPVWpE81DmZsua_2tKwgJ97r0qz37z56ns7NB4"); Map map = new HashMap(); map.put("first", ivrTask1.getTaskName()); sendMagParam.setContent(JSON.toJSONString(map)); Boolean aBoolean = sendService.sendMsg(sendMagParam); //任务发送记录 ServiceSubtaskRecord serviceSubtaskRecord = new ServiceSubtaskRecord(); serviceSubtaskRecord.setTaskid(ivrTaskSingle.getTaskid().toString()); serviceSubtaskRecord.setUuid(UUID.randomUUID().toString()); serviceSubtaskRecord.setTasktype(ivrTaskSingle.getType()); serviceSubtaskRecord.setPreachform("4"); serviceSubtaskRecord.setStartTime(System.currentTimeMillis()); serviceSubtaskRecord.setResult(aBoolean == true ? "成功" : "失败"); serviceSubtaskRecordService.insertServiceSubtaskRecord(serviceSubtaskRecord); } } //通知 MQ 消息已被接收,可以ACK(从队列中删除)了 (这个需要根据业务再去处理ACK) } } } /** * 问卷任务处理 * * @param commonTaskcallMQ */ private void wjHandle(CommonTaskcallMQ commonTaskcallMQ) { //判断一下commonTaskcallMQ中的stopstate是否与ivr_task中的一致,不一致,则说明是暂停了 SvyTask svyTask = svyTaskMapper.selectSvyTaskByTaskid(commonTaskcallMQ.getTaskid()); if (svyTask.getStopState() != commonTaskcallMQ.getStopState()) { //将消息从队列中剔除 return; } if (StringUtils.isNotEmpty(commonTaskcallMQ.getPreachform())) { //如何任务发送方式不为空 String[] split = commonTaskcallMQ.getPreachform().split(","); System.out.println("split的值为:" + split); for (String serviceFrom : split) { String descByCode = ServiceFromEnum.getDescByCode(Integer.valueOf(serviceFrom)); //这里可以考虑用策略模式优化一下,不然太难看了 //通过模板ID获取模板问题 SvyTaskTemplateVO svyTaskTemplateVO = new SvyTaskTemplateVO(); svyTaskTemplateVO.setId(Long.valueOf(commonTaskcallMQ.getTemplateid())); SvyTaskTemplateVO svyTaskTemplateVO1 = iSvyTaskTemplateService.selectInfoByCondition(svyTaskTemplateVO); //通过任务ID拿到患者信息 SvyTaskSingle svyTaskSingle = new SvyTaskSingle(); svyTaskSingle.setTaskid(commonTaskcallMQ.getTaskid()); List svyTaskSingles = svyTaskSingleMapper.selectSvyTaskSingleList(svyTaskSingle); if (descByCode.equals("电话")) { for (SvyTaskSingle svyTaskSingle1 : svyTaskSingles) { SvyTask svyTask1 = svyTaskMapper.selectSvyTaskByTaskid(svyTaskSingle1.getTaskid()); if (svyTask1.getSendState() != null && svyTask1.getSendState().equals("3") || svyTask1.getSendState() != null && svyTask1.getSendState().equals("4")) { //如何任务被“暂停”或“终止” break; } //任务发送记录(这个代码应该写在phoneTask中,先写在这,后面再改) ServiceSubtaskRecord serviceSubtaskRecord = new ServiceSubtaskRecord(); serviceSubtaskRecord.setTaskid(svyTaskSingle1.getTaskid().toString()); serviceSubtaskRecord.setUuid(UUID.randomUUID().toString()); serviceSubtaskRecord.setTasktype(svyTaskSingle1.getType()); serviceSubtaskRecord.setPreachform("3"); serviceSubtaskRecord.setStartTime(System.currentTimeMillis()); serviceSubtaskRecordService.insertServiceSubtaskRecord(serviceSubtaskRecord); //通过多线程的方式去打电话 // executorService.submit(new PhoneTask(ivrTaskcall1, ivrTaskTemplateVO1, redisCache, sendService, phonePath, robotPhoneUtils)); } } else if (descByCode.equals("多媒体")) { //多媒体 } else if (descByCode.equals("纸质")) { //纸质 } else if (descByCode.equals("短信")) { //短信 } else if (descByCode.equals("公众号")) { //公众号 RSAPublicKeyExample rsaPublicKeyExample = new RSAPublicKeyExample(); String taskId = rsaPublicKeyExample.encryptedData(svyTask.getTaskid().toString(), pub_key); for (SvyTaskSingle svyTaskSingle1 : svyTaskSingles) { String patid = rsaPublicKeyExample.encryptedData(svyTaskSingle1.getPatid().toString(), pub_key); SendMagParam sendMagParam = new SendMagParam(); sendMagParam.setType("5"); sendMagParam.setUrl(req_path + "/outsideChain?param1=" + taskId + "¶m2=" + patid + "¶m3=" + svyTask.getTaskName()); log.info("链接完整路径:{}", req_path + "/outsideChain?param1=" + taskId + "¶m2=" + patid + "¶m3=" + svyTask.getTaskName()); //这个模板ID先写死,后面做成可选 sendMagParam.setTmpCode("oG3pJHPVWpE81DmZsua_2tKwgJ97r0qz37z56ns7NB4"); Map map = new HashMap(); map.put("first", svyTask.getTaskName()); sendMagParam.setContent(JSON.toJSONString(map)); Boolean aBoolean = sendService.sendMsg(sendMagParam); //任务发送记录 ServiceSubtaskRecord serviceSubtaskRecord = new ServiceSubtaskRecord(); serviceSubtaskRecord.setTaskid(svyTaskSingle1.getTaskid().toString()); serviceSubtaskRecord.setUuid(UUID.randomUUID().toString()); serviceSubtaskRecord.setTasktype(svyTaskSingle1.getType()); serviceSubtaskRecord.setPreachform("4"); serviceSubtaskRecord.setStartTime(System.currentTimeMillis()); serviceSubtaskRecord.setResult(aBoolean == true ? "成功" : "失败"); serviceSubtaskRecordService.insertServiceSubtaskRecord(serviceSubtaskRecord); } } //通知 MQ 消息已被接收,可以ACK(从队列中删除)了 (这个需要根据业务再去处理ACK) } } } // /** // * 机器人第一句话语音 // * concurrency = "50" 并发数为50 // */ // @RabbitListener(queues = "ob_queue", concurrency = "50") // public void obVisit(String content, Message message, Channel channel) throws IOException { // try { // IvrTaskcallPhoneMQ commonTaskcallMQ = null; // ObjectMapper mapper = new ObjectMapper(); // commonTaskcallMQ = mapper.readValue(content, IvrTaskcallPhoneMQ.class); // //判断患者是否已经接电话 // String str = redisCache.getCacheObject(commonTaskcallMQ.getUuid() + "state_id"); // System.out.println("-----------------" + str); // if (StringUtils.isNotEmpty(str) && str.equals("0")) { // //患者已经接听了电话 // new PhoneUtils().ttsPlayback(commonTaskcallMQ.getScript(), commonTaskcallMQ.getUuid()); // } else if (StringUtils.isNotEmpty(str) && str.equals("-10")) { // //这个说明,已经打了指定遍数还是没有人接,可以将消息从队列中移除了 // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // return; // } else { // //进行队列尾部,进行下一次校验 //// channel.basicRecover(false); // channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // System.out.println("++++++++++++++++++++++" + str); // return; // } // //通知 MQ 消息已被接收,可以ACK(从队列中删除)了 (这个需要根据业务再去处理ACK) // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // } catch (Exception e) { // log.error("============消费失败,尝试消息补发再次消费!=============="); // log.error(e.getMessage()); // channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // } // } }