| | |
| | | import com.fasterxml.jackson.databind.ObjectMapper; |
| | | import com.rabbitmq.client.Channel; |
| | | import com.ruoyi.common.core.redis.RedisCache; |
| | | import com.ruoyi.common.enums.ServiceFromEnum; |
| | | import com.ruoyi.common.utils.StringUtils; |
| | | import com.ruoyi.web.task.PhoneTask; |
| | | import com.smartor.common.SendService; |
| | | import com.smartor.config.PhoneUtils; |
| | | import com.smartor.config.RabbitMqCallPhoneConfig; |
| | | import com.smartor.config.RobotPhoneUtils; |
| | | import com.smartor.domain.*; |
| | | import com.smartor.mapper.IvrTaskMapper; |
| | | import com.smartor.mapper.IvrTaskSingleMapper; |
| | | import com.smartor.service.IIvrLibaTemplateService; |
| | | import com.smartor.service.IIvrTaskTemplateService; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.amqp.core.Message; |
| | | import org.springframework.amqp.rabbit.annotation.RabbitListener; |
| | |
| | | import org.springframework.amqp.rabbit.connection.ConnectionFactory; |
| | | import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.beans.factory.annotation.Value; |
| | | import org.springframework.context.annotation.Bean; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | |
| | | @Component//监听此队列 |
| | | public class RabbitMqReceiver { |
| | | |
| | | @Value("${phonePath}") |
| | | private String phonePath; |
| | | |
| | | private static IIvrLibaTemplateService ivrLibaTemplateService; |
| | | private static IIvrTaskTemplateService ivrTaskTemplateService; |
| | | |
| | | private static IvrTaskSingleMapper ivrTaskcallMapper; |
| | | |
| | |
| | | |
| | | private static RabbitMqCallPhoneConfig rabbitMqCallPhoneConfig; |
| | | |
| | | private static RobotPhoneUtils robotPhoneUtils; |
| | | |
| | | // 创建固定大小的线程池 |
| | | private static final ExecutorService executorService = Executors.newFixedThreadPool(10); |
| | | |
| | | @Autowired |
| | | public void setIIvrTaskTemplateService(IIvrTaskTemplateService ivrTaskTemplateService) { |
| | | RabbitMqReceiver.ivrTaskTemplateService = ivrTaskTemplateService; |
| | | } |
| | | |
| | | @Autowired |
| | | public void setIIvrLibaTemplateService(IIvrLibaTemplateService ivrLibaTemplateService) { |
| | | RabbitMqReceiver.ivrLibaTemplateService = ivrLibaTemplateService; |
| | | public void setRobotPhoneUtilsService(RobotPhoneUtils robotPhoneUtils) { |
| | | RabbitMqReceiver.robotPhoneUtils = robotPhoneUtils; |
| | | } |
| | | |
| | | @Autowired |
| | |
| | | } |
| | | |
| | | /** |
| | | * 电话随访 |
| | | * 任务随访 |
| | | * concurrency = "50" 并发数为50 |
| | | */ |
| | | @RabbitListener(queues = "phone_queue", concurrency = "50") |
| | | @RabbitListener(queues = "task_queue", concurrency = "50") |
| | | public void phoneVisit(String content, Message message, Channel channel) throws IOException { |
| | | |
| | | try { |
| | | IvrTaskcallMQ ivrTaskcallMQ = null; |
| | | |
| | |
| | | } catch (JsonProcessingException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | //通过模板ID获取模板问题 |
| | | IvrLibaTemplateVO ivrLibaTemplateVO = new IvrLibaTemplateVO(); |
| | | ivrLibaTemplateVO.setID(Long.valueOf(ivrTaskcallMQ.getTemplateid())); |
| | | IvrLibaTemplateVO ivrLibaTemplateVO1 = ivrLibaTemplateService.selectInfoByCondition(ivrLibaTemplateVO); |
| | | |
| | | //通过任务ID拿到患者信息 |
| | | IvrTaskSingle ivrTaskcall = new IvrTaskSingle(); |
| | | ivrTaskcall.setTaskid(ivrTaskcallMQ.getTaskid()); |
| | | List<IvrTaskSingle> ivrTaskcalls = ivrTaskcallMapper.selectIvrTaskcallList(ivrTaskcall); |
| | | |
| | | for (IvrTaskSingle ivrTaskcall1 : ivrTaskcalls) { |
| | | IvrTask ivrTask = ivrTaskMapper.selectIvrTaskByTaskid(ivrTaskcall1.getTaskid()); |
| | | if (StringUtils.isNotEmpty(ivrTask.getSendState()) && ivrTask.getSendState().equals(3) || StringUtils.isNotEmpty(ivrTask.getSendState()) && ivrTask.getSendState().equals(4)) { |
| | | //如何任务被“暂停”或“终止” |
| | | break; |
| | | } |
| | | // //通过多线程的方式去打电话 |
| | | executorService.submit(new PhoneTask(ivrTaskcall1, ivrLibaTemplateVO1, redisCache, rabbitMqCallPhoneConfig, message, sendService)); |
| | | //判断一下ivrTaskcallMQ中的stopstate是否与ivr_task中的一致,不一致,则说明是暂停了 |
| | | IvrTask ivrTask1 = ivrTaskMapper.selectIvrTaskByTaskid(ivrTaskcallMQ.getTaskid()); |
| | | if (ivrTask1.getStopState() != ivrTaskcallMQ.getStopState()) { |
| | | //将消息从队列中剔除 |
| | | channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); |
| | | return; |
| | | } |
| | | //通知 MQ 消息已被接收,可以ACK(从队列中删除)了 (这个需要根据业务再去处理ACK) |
| | | channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); |
| | | |
| | | if (StringUtils.isNotEmpty(ivrTaskcallMQ.getServicefrom())) { |
| | | //如何任务发送方式不为空 |
| | | String[] split = ivrTaskcallMQ.getServicefrom().split(","); |
| | | for (String serviceFrom : split) { |
| | | String descByCode = ServiceFromEnum.getDescByCode(Integer.valueOf(serviceFrom)); |
| | | //这里可以考虑用策略模式优化一下,不然太难看了 |
| | | if (descByCode.equals("电话")) { |
| | | //通过模板ID获取模板问题 |
| | | IvrTaskTemplateVO ivrTaskTemplateVO = new IvrTaskTemplateVO(); |
| | | ivrTaskTemplateVO.setID(Long.valueOf(ivrTaskcallMQ.getTemplateid())); |
| | | IvrTaskTemplateVO ivrTaskTemplateVO1 = ivrTaskTemplateService.selectInfoByCondition(ivrTaskTemplateVO); |
| | | |
| | | //通过任务ID拿到患者信息 |
| | | IvrTaskSingle ivrTaskcall = new IvrTaskSingle(); |
| | | ivrTaskcall.setTaskid(ivrTaskcallMQ.getTaskid()); |
| | | List<IvrTaskSingle> ivrTaskcalls = ivrTaskcallMapper.selectIvrTaskcallList(ivrTaskcall); |
| | | |
| | | for (IvrTaskSingle ivrTaskcall1 : ivrTaskcalls) { |
| | | IvrTask ivrTask = ivrTaskMapper.selectIvrTaskByTaskid(ivrTaskcall1.getTaskid()); |
| | | if (StringUtils.isNotEmpty(ivrTask.getSendState()) && ivrTask.getSendState().equals(3) || StringUtils.isNotEmpty(ivrTask.getSendState()) && ivrTask.getSendState().equals(4)) { |
| | | //如何任务被“暂停”或“终止” |
| | | break; |
| | | } |
| | | //通过多线程的方式去打电话 |
| | | executorService.submit(new PhoneTask(ivrTaskcall1, ivrTaskTemplateVO1, redisCache, rabbitMqCallPhoneConfig, message, sendService, phonePath, robotPhoneUtils)); |
| | | } |
| | | } else if (descByCode.equals("公众号")) { |
| | | |
| | | } |
| | | //通知 MQ 消息已被接收,可以ACK(从队列中删除)了 (这个需要根据业务再去处理ACK) |
| | | channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("============消费失败,尝试消息补发再次消费!=============="); |
| | | log.error(e.getMessage()); |
| | | channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | @Bean |
| | | public SimpleRabbitListenerContainerFactory jsonContainerFactory(ConnectionFactory connectionFactory) { |
| | | SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); |