package com.ruoyi.web.component;
|
|
import com.alibaba.fastjson2.JSON;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.rabbitmq.client.Channel;
|
import com.ruoyi.common.core.redis.RedisCache;
|
import com.ruoyi.common.enums.ServiceFromEnum;
|
import com.ruoyi.common.utils.RSAPublicKeyExample;
|
import com.ruoyi.common.utils.StringUtils;
|
import com.ruoyi.web.task.PhoneTask;
|
import com.smartor.common.SendService;
|
import com.smartor.config.PhoneUtils;
|
import com.smartor.config.RabbitMqCallPhoneConfig;
|
import com.smartor.config.RobotPhoneUtils;
|
import com.smartor.domain.*;
|
import com.smartor.mapper.IvrTaskMapper;
|
import com.smartor.mapper.IvrTaskSingleMapper;
|
import com.smartor.service.IIvrTaskTemplateService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.amqp.core.Message;
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.stereotype.Component;
|
|
import java.io.IOException;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.TimeUnit;
|
|
@Slf4j
|
@Component//监听此队列
|
public class RabbitMqReceiver {
|
|
@Value("${phonePath}")
|
private String phonePath;
|
|
@Value("${pub_key}")
|
private String pub_key;
|
|
@Value("${req_path}")
|
private String req_path;
|
|
private static IIvrTaskTemplateService ivrTaskTemplateService;
|
|
private static IvrTaskSingleMapper ivrTaskcallMapper;
|
|
private static IvrTaskMapper ivrTaskMapper;
|
|
private static SendService sendService;
|
|
private static RedisCache redisCache;
|
|
private static RabbitMqCallPhoneConfig rabbitMqCallPhoneConfig;
|
|
private static RobotPhoneUtils robotPhoneUtils;
|
|
|
// 创建固定大小的线程池
|
private static final ExecutorService executorService = Executors.newFixedThreadPool(10);
|
|
@Autowired
|
public void setIIvrTaskTemplateService(IIvrTaskTemplateService ivrTaskTemplateService) {
|
RabbitMqReceiver.ivrTaskTemplateService = ivrTaskTemplateService;
|
}
|
|
@Autowired
|
public void setRobotPhoneUtilsService(RobotPhoneUtils robotPhoneUtils) {
|
RabbitMqReceiver.robotPhoneUtils = robotPhoneUtils;
|
}
|
|
@Autowired
|
public void setSendService(SendService sendService) {
|
RabbitMqReceiver.sendService = sendService;
|
}
|
|
@Autowired
|
public void setRabbitMqCallPhoneConfig(RabbitMqCallPhoneConfig rabbitMqCallPhoneConfig) {
|
RabbitMqReceiver.rabbitMqCallPhoneConfig = rabbitMqCallPhoneConfig;
|
}
|
|
@Autowired
|
public void setIvrTaskcallMapper(IvrTaskSingleMapper ivrTaskcallMapper) {
|
RabbitMqReceiver.ivrTaskcallMapper = ivrTaskcallMapper;
|
}
|
|
@Autowired
|
public void setIvrTaskMapper(IvrTaskMapper ivrTaskMapper) {
|
RabbitMqReceiver.ivrTaskMapper = ivrTaskMapper;
|
}
|
|
@Autowired
|
public void setRedisCache(RedisCache redisCache) {
|
RabbitMqReceiver.redisCache = redisCache;
|
}
|
|
/**
|
* 任务随访
|
* concurrency = "50" 并发数为50
|
*/
|
@RabbitListener(queues = "task_queue", concurrency = "50")
|
public void phoneVisit(String content, Message message, Channel channel) throws IOException {
|
System.out.println("消息进来了----------------");
|
IvrTaskcallMQ ivrTaskcallMQ = null;
|
try {
|
ObjectMapper mapper = new ObjectMapper();
|
try {
|
if (!content.startsWith("{")) {
|
ivrTaskcallMQ = mapper.readValue("{" + content + "}", IvrTaskcallMQ.class);
|
} else {
|
ivrTaskcallMQ = mapper.readValue(content, IvrTaskcallMQ.class);
|
}
|
} catch (JsonProcessingException e) {
|
e.printStackTrace();
|
}
|
//判断一下ivrTaskcallMQ中的stopstate是否与ivr_task中的一致,不一致,则说明是暂停了
|
IvrTask ivrTask1 = ivrTaskMapper.selectIvrTaskByTaskid(ivrTaskcallMQ.getTaskid());
|
if (ivrTask1.getStopState() != ivrTaskcallMQ.getStopState()) {
|
//将消息从队列中剔除
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
return;
|
}
|
|
if (StringUtils.isNotEmpty(ivrTaskcallMQ.getPreachform())) {
|
//如何任务发送方式不为空
|
String[] split = ivrTaskcallMQ.getPreachform().split(",");
|
for (String serviceFrom : split) {
|
String descByCode = ServiceFromEnum.getDescByCode(Integer.valueOf(serviceFrom));
|
//这里可以考虑用策略模式优化一下,不然太难看了
|
|
//通过模板ID获取模板问题
|
IvrTaskTemplateVO ivrTaskTemplateVO = new IvrTaskTemplateVO();
|
ivrTaskTemplateVO.setID(Long.valueOf(ivrTaskcallMQ.getTemplateid()));
|
IvrTaskTemplateVO ivrTaskTemplateVO1 = ivrTaskTemplateService.selectInfoByCondition(ivrTaskTemplateVO);
|
|
//通过任务ID拿到患者信息
|
IvrTaskSingle ivrTaskcall = new IvrTaskSingle();
|
ivrTaskcall.setTaskid(ivrTaskcallMQ.getTaskid());
|
List<IvrTaskSingle> ivrTaskcalls = ivrTaskcallMapper.selectIvrTaskcallList(ivrTaskcall);
|
if (descByCode.equals("电话")) {
|
for (IvrTaskSingle ivrTaskcall1 : ivrTaskcalls) {
|
IvrTask ivrTask = ivrTaskMapper.selectIvrTaskByTaskid(ivrTaskcall1.getTaskid());
|
if (ivrTask.getSendState() != null && ivrTask.getSendState() == 3 || ivrTask.getSendState() != null && ivrTask.getSendState() == 4) {
|
//如何任务被“暂停”或“终止”
|
break;
|
}
|
//通过多线程的方式去打电话
|
executorService.submit(new PhoneTask(ivrTaskcall1, ivrTaskTemplateVO1, redisCache, rabbitMqCallPhoneConfig, message, sendService, phonePath, robotPhoneUtils));
|
}
|
} else if (descByCode.equals("多媒体")) {
|
//多媒体
|
} else if (descByCode.equals("纸质")) {
|
//纸质
|
} else if (descByCode.equals("短信")) {
|
//短信
|
// http://localhost:8099/followvisit/particty?param1=3¶m2=348
|
//对url中两个参数加密
|
RSAPublicKeyExample rsaPublicKeyExample = new RSAPublicKeyExample();
|
String taskId = rsaPublicKeyExample.encryptedData(ivrTask1.getTaskid().toString(), pub_key);
|
|
|
for (IvrTaskSingle ivrTaskSingle : ivrTaskcalls) {
|
String patid = rsaPublicKeyExample.encryptedData(ivrTaskSingle.getPatid().toString(), pub_key);
|
SendMagParam sendMagParam = new SendMagParam();
|
sendMagParam.setType("4");
|
sendMagParam.setUrl(req_path + "/followvisit/particty?param1=" + taskId + "¶m2=" + patid);
|
sendService.sendMsg(sendMagParam);
|
}
|
} else if (descByCode.equals("公众号")) {
|
//公众号
|
RSAPublicKeyExample rsaPublicKeyExample = new RSAPublicKeyExample();
|
String taskId = rsaPublicKeyExample.encryptedData(ivrTask1.getTaskid().toString(), pub_key);
|
for (IvrTaskSingle ivrTaskSingle : ivrTaskcalls) {
|
String patid = rsaPublicKeyExample.encryptedData(ivrTaskSingle.getPatid().toString(), pub_key);
|
SendMagParam sendMagParam = new SendMagParam();
|
sendMagParam.setType("5");
|
sendMagParam.setUrl(req_path + "/outsideChain?param1=" + taskId + "¶m2=" + patid + "¶m3=" + ivrTask1.getTaskName());
|
log.info("链接完整路径:{}", req_path + "/outsideChain?param1=" + taskId + "¶m2=" + patid + "¶m3=" + ivrTask1.getTaskName());
|
//这个模板ID先写死,后面做成可选
|
sendMagParam.setTmpCode("oG3pJHPVWpE81DmZsua_2tKwgJ97r0qz37z56ns7NB4");
|
Map map = new HashMap();
|
map.put("first", ivrTask1.getTaskName());
|
sendMagParam.setContent(JSON.toJSONString(map));
|
sendService.sendMsg(sendMagParam);
|
}
|
}
|
//通知 MQ 消息已被接收,可以ACK(从队列中删除)了 (这个需要根据业务再去处理ACK)
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
}
|
}
|
} catch (Exception e) {
|
Integer integer = redisCache.getCacheObject(ivrTaskcallMQ.getTaskid().toString());
|
if (integer != null && integer == 2) {
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
} else if (integer == null) {
|
redisCache.setCacheObject(ivrTaskcallMQ.getTaskid().toString(), 1, 120, TimeUnit.MINUTES);
|
} else {
|
redisCache.setCacheObject(ivrTaskcallMQ.getTaskid().toString(), integer + 1, 120, TimeUnit.MINUTES);
|
}
|
|
log.error("============消费失败,尝试消息补发再次消费!==============");
|
log.error(e.getMessage());
|
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
|
}
|
|
}
|
|
/**
|
* 机器人第一句话语音
|
* concurrency = "50" 并发数为50
|
*/
|
@RabbitListener(queues = "ob_queue", concurrency = "50")
|
public void obVisit(String content, Message message, Channel channel) throws IOException {
|
try {
|
IvrTaskcallPhoneMQ ivrTaskcallMQ = null;
|
ObjectMapper mapper = new ObjectMapper();
|
ivrTaskcallMQ = mapper.readValue(content, IvrTaskcallPhoneMQ.class);
|
//判断患者是否已经接电话
|
String str = redisCache.getCacheObject(ivrTaskcallMQ.getUuid() + "state_id");
|
System.out.println("-----------------" + str);
|
if (StringUtils.isNotEmpty(str) && str.equals("0")) {
|
//患者已经接听了电话
|
new PhoneUtils().ttsPlayback(ivrTaskcallMQ.getScript(), ivrTaskcallMQ.getUuid());
|
} else if (StringUtils.isNotEmpty(str) && str.equals("-10")) {
|
//这个说明,已经打了指定遍数还是没有人接,可以将消息从队列中移除了
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
return;
|
} else {
|
//进行队列尾部,进行下一次校验
|
// channel.basicRecover(false);
|
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
|
System.out.println("++++++++++++++++++++++" + str);
|
return;
|
}
|
//通知 MQ 消息已被接收,可以ACK(从队列中删除)了 (这个需要根据业务再去处理ACK)
|
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
} catch (Exception e) {
|
log.error("============消费失败,尝试消息补发再次消费!==============");
|
log.error(e.getMessage());
|
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
|
}
|
}
|
|
|
@Bean
|
public SimpleRabbitListenerContainerFactory jsonContainerFactory(ConnectionFactory connectionFactory) {
|
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
factory.setConnectionFactory(connectionFactory);
|
factory.setMessageConverter(new Jackson2JsonMessageConverter());
|
return factory;
|
}
|
}
|