liusheng
2024-04-15 fdf1b9c1e4489a0c2615fa596268b2f71fad7b4c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package com.ruoyi.web.component;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ruoyi.common.core.redis.RedisCache;
import com.ruoyi.common.utils.DateUtils;
import com.ruoyi.web.task.PhoneTask;
import com.smartor.config.RabbitMqConfig;
import com.smartor.domain.IvrLibaTemplateVO;
import com.smartor.domain.IvrTaskcall;
import com.smartor.domain.IvrTaskcallMQ;
import com.smartor.mapper.IvrTaskcallMapper;
import com.smartor.service.IIvrLibaTemplateService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
 
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
 
@Slf4j
@Component//监听此队列
public class RabbitMqReceiver {
 
    private static IIvrLibaTemplateService ivrLibaTemplateService;
 
    private static IvrTaskcallMapper ivrTaskcallMapper;
 
    private static RedisCache redisCache;
 
    // 创建固定大小的线程池
    private static final ExecutorService executorService = Executors.newFixedThreadPool(10);
 
 
    @Autowired
    public void setIIvrLibaTemplateService(IIvrLibaTemplateService ivrLibaTemplateService) {
        RabbitMqReceiver.ivrLibaTemplateService = ivrLibaTemplateService;
    }
 
    @Autowired
    public void setIvrTaskcallMapper(IvrTaskcallMapper ivrTaskcallMapper) {
        RabbitMqReceiver.ivrTaskcallMapper = ivrTaskcallMapper;
    }
 
    @Autowired
    public void setRedisCache(RedisCache redisCache) {
        RabbitMqReceiver.redisCache = redisCache;
    }
 
//    @RabbitListener(queues = RabbitMqConfig.delay_queue)
//    public void consultReceiveDealy(String content, Message message, Channel channel) throws IOException {
//        log.info("----------------接收延迟队列消息--------------------");
//        //通知 MQ 消息已被接收,可以ACK(从队列中删除)了
//        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//        try {
//            log.info("=============Do Something==============");
//        } catch (Exception e) {
//            log.error("============消费失败,尝试消息补发再次消费!==============");
//            log.error(e.getMessage());
//            /**
//             * basicRecover方法是进行补发操作,
//             * 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
//             * 设置为false是只补发给当前的consumer
//             */
//            channel.basicRecover(false);
//        }
//    }
 
    /**
     * 电话随访
     * concurrency = "50"  并发数为50
     */
    @RabbitListener(queues = RabbitMqConfig.phone_queue, concurrency = "50")
    public void phoneVisit(String content, Message message, Channel channel) throws IOException {
//        System.out.println(content);
//        IvrTaskcallMQ ivrTaskcallMQ = null;
//        ObjectMapper mapper = new ObjectMapper();
//        try {
//            ivrTaskcallMQ = mapper.readValue(content, IvrTaskcallMQ.class);
//        } catch (JsonProcessingException e) {
//            e.printStackTrace();
//        }
//        //通过模板ID获取模板问题
//        IvrLibaTemplateVO ivrLibaTemplateVO = new IvrLibaTemplateVO();
//        ivrLibaTemplateVO.setID(Long.valueOf(ivrTaskcallMQ.getTemplateid()));
//        IvrLibaTemplateVO ivrLibaTemplateVO1 = ivrLibaTemplateService.selectInfoByCondition(ivrLibaTemplateVO);
//
//        //通过任务ID拿到患者信息
//        IvrTaskcall ivrTaskcall = new IvrTaskcall();
//        ivrTaskcall.setTaskid(ivrTaskcallMQ.getTaskid());
//        List<IvrTaskcall> ivrTaskcalls = ivrTaskcallMapper.selectIvrTaskcallList(ivrTaskcall);
//
//        for (IvrTaskcall ivrTaskcall1 : ivrTaskcalls) {
//            //通过多线程的方式去打电话
//            executorService.submit(new PhoneTask(ivrTaskcall1, ivrLibaTemplateVO1, redisCache));
//        }
        //通知 MQ 消息已被接收,可以ACK(从队列中删除)了   (这个需要根据业务再去处理ACK)
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        try {
            log.info("=============Do Something==============");
        } catch (Exception e) {
            log.error("============消费失败,尝试消息补发再次消费!==============");
            log.error(e.getMessage());
            /**
             * basicRecover方法是进行补发操作,
             * 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
             * 设置为false是只补发给当前的consumer
             */
            channel.basicRecover(false);
        }
    }
}