package com.ruoyi.web.component; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.rabbitmq.client.Channel; import com.ruoyi.common.core.redis.RedisCache; import com.ruoyi.common.enums.ServiceFromEnum; import com.ruoyi.common.utils.StringUtils; import com.ruoyi.web.task.PhoneTask; import com.smartor.common.SendService; import com.smartor.config.PhoneUtils; import com.smartor.config.RabbitMqCallPhoneConfig; import com.smartor.config.RobotPhoneUtils; import com.smartor.domain.*; import com.smartor.mapper.IvrTaskMapper; import com.smartor.mapper.IvrTaskSingleMapper; import com.smartor.service.IIvrTaskTemplateService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j @Component//监听此队列 public class RabbitMqReceiver { @Value("${phonePath}") private String phonePath; private static IIvrTaskTemplateService ivrTaskTemplateService; private static IvrTaskSingleMapper ivrTaskcallMapper; private static IvrTaskMapper ivrTaskMapper; private static SendService sendService; private static RedisCache redisCache; private static RabbitMqCallPhoneConfig rabbitMqCallPhoneConfig; private static RobotPhoneUtils robotPhoneUtils; // 创建固定大小的线程池 private static final ExecutorService executorService = Executors.newFixedThreadPool(10); @Autowired public void setIIvrTaskTemplateService(IIvrTaskTemplateService ivrTaskTemplateService) { RabbitMqReceiver.ivrTaskTemplateService = ivrTaskTemplateService; } @Autowired public void setRobotPhoneUtilsService(RobotPhoneUtils robotPhoneUtils) { RabbitMqReceiver.robotPhoneUtils = robotPhoneUtils; } @Autowired public void setSendService(SendService sendService) { RabbitMqReceiver.sendService = sendService; } @Autowired public void setRabbitMqCallPhoneConfig(RabbitMqCallPhoneConfig rabbitMqCallPhoneConfig) { RabbitMqReceiver.rabbitMqCallPhoneConfig = rabbitMqCallPhoneConfig; } @Autowired public void setIvrTaskcallMapper(IvrTaskSingleMapper ivrTaskcallMapper) { RabbitMqReceiver.ivrTaskcallMapper = ivrTaskcallMapper; } @Autowired public void setIvrTaskMapper(IvrTaskMapper ivrTaskMapper) { RabbitMqReceiver.ivrTaskMapper = ivrTaskMapper; } @Autowired public void setRedisCache(RedisCache redisCache) { RabbitMqReceiver.redisCache = redisCache; } /** * 任务随访 * concurrency = "50" 并发数为50 */ @RabbitListener(queues = "task_queue", concurrency = "50") public void phoneVisit(String content, Message message, Channel channel) throws IOException { try { IvrTaskcallMQ ivrTaskcallMQ = null; ObjectMapper mapper = new ObjectMapper(); try { if (!content.contains("{")) { ivrTaskcallMQ = mapper.readValue("{" + content + "}", IvrTaskcallMQ.class); } else { ivrTaskcallMQ = mapper.readValue(content, IvrTaskcallMQ.class); } } catch (JsonProcessingException e) { e.printStackTrace(); } //判断一下ivrTaskcallMQ中的stopstate是否与ivr_task中的一致,不一致,则说明是暂停了 IvrTask ivrTask1 = ivrTaskMapper.selectIvrTaskByTaskid(ivrTaskcallMQ.getTaskid()); if (ivrTask1.getStopState() != ivrTaskcallMQ.getStopState()) { //将消息从队列中剔除 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); return; } if (StringUtils.isNotEmpty(ivrTaskcallMQ.getServicefrom())) { //如何任务发送方式不为空 String[] split = ivrTaskcallMQ.getServicefrom().split(","); for (String serviceFrom : split) { String descByCode = ServiceFromEnum.getDescByCode(Integer.valueOf(serviceFrom)); //这里可以考虑用策略模式优化一下,不然太难看了 if (descByCode.equals("电话")) { //通过模板ID获取模板问题 IvrTaskTemplateVO ivrTaskTemplateVO = new IvrTaskTemplateVO(); ivrTaskTemplateVO.setID(Long.valueOf(ivrTaskcallMQ.getTemplateid())); IvrTaskTemplateVO ivrTaskTemplateVO1 = ivrTaskTemplateService.selectInfoByCondition(ivrTaskTemplateVO); //通过任务ID拿到患者信息 IvrTaskSingle ivrTaskcall = new IvrTaskSingle(); ivrTaskcall.setTaskid(ivrTaskcallMQ.getTaskid()); List ivrTaskcalls = ivrTaskcallMapper.selectIvrTaskcallList(ivrTaskcall); for (IvrTaskSingle ivrTaskcall1 : ivrTaskcalls) { IvrTask ivrTask = ivrTaskMapper.selectIvrTaskByTaskid(ivrTaskcall1.getTaskid()); if (StringUtils.isNotEmpty(ivrTask.getSendState()) && ivrTask.getSendState().equals(3) || StringUtils.isNotEmpty(ivrTask.getSendState()) && ivrTask.getSendState().equals(4)) { //如何任务被“暂停”或“终止” break; } //通过多线程的方式去打电话 executorService.submit(new PhoneTask(ivrTaskcall1, ivrTaskTemplateVO1, redisCache, rabbitMqCallPhoneConfig, message, sendService, phonePath, robotPhoneUtils)); } } else if (descByCode.equals("公众号")) { } //通知 MQ 消息已被接收,可以ACK(从队列中删除)了 (这个需要根据业务再去处理ACK) channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } } catch (Exception e) { log.error("============消费失败,尝试消息补发再次消费!=============="); log.error(e.getMessage()); channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } /** * 机器人第一句话语音 * concurrency = "50" 并发数为50 */ @RabbitListener(queues = "ob_queue", concurrency = "50") public void obVisit(String content, Message message, Channel channel) throws IOException { try { IvrTaskcallPhoneMQ ivrTaskcallMQ = null; ObjectMapper mapper = new ObjectMapper(); ivrTaskcallMQ = mapper.readValue(content, IvrTaskcallPhoneMQ.class); //判断患者是否已经接电话 String str = redisCache.getCacheObject(ivrTaskcallMQ.getUuid() + "state_id"); System.out.println("-----------------" + str); if (StringUtils.isNotEmpty(str) && str.equals("0")) { //患者已经接听了电话 new PhoneUtils().ttsPlayback(ivrTaskcallMQ.getScript(), ivrTaskcallMQ.getUuid()); } else if (StringUtils.isNotEmpty(str) && str.equals("-10")) { //这个说明,已经打了指定遍数还是没有人接,可以将消息从队列中移除了 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); return; } else { //进行队列尾部,进行下一次校验 // channel.basicRecover(false); channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); System.out.println("++++++++++++++++++++++" + str); return; } //通知 MQ 消息已被接收,可以ACK(从队列中删除)了 (这个需要根据业务再去处理ACK) channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { log.error("============消费失败,尝试消息补发再次消费!=============="); log.error(e.getMessage()); channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } @Bean public SimpleRabbitListenerContainerFactory jsonContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } }