package cn.lihu.jh.module.ecg.service.queue; import cn.lihu.jh.framework.common.exception.ErrorCode; import cn.lihu.jh.module.ecg.dal.dataobject.queue.BedQueueStatisticDO; import cn.lihu.jh.module.ecg.dal.dataobject.queue.QueueStatisticDO; import cn.lihu.jh.module.ecg.dal.dataobject.room.RoomDO; import cn.lihu.jh.module.ecg.dal.mysql.room.RoomMapper; import cn.lihu.jh.module.ecg.enums.BedStatusEnum; import cn.lihu.jh.module.ecg.enums.ErrorCodeConstants; import cn.lihu.jh.module.ecg.enums.QueueStatusEnum; import cn.lihu.jh.module.infra.api.config.ConfigApi; import org.springframework.stereotype.Service; import org.springframework.validation.annotation.Validated; import cn.lihu.jh.module.ecg.controller.admin.queue.vo.*; import cn.lihu.jh.module.ecg.dal.dataobject.queue.QueueDO; import cn.lihu.jh.framework.common.pojo.PageResult; import cn.lihu.jh.framework.common.util.object.BeanUtils; import cn.lihu.jh.module.ecg.dal.mysql.queue.queueMapper; import javax.annotation.Resource; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import static cn.lihu.jh.framework.common.exception.enums.GlobalErrorCodeConstants.SUCCESS; import static cn.lihu.jh.framework.common.exception.util.ServiceExceptionUtil.exception; import static cn.lihu.jh.module.ecg.Constants.ECG_OPENING_TIME_KEY; import static cn.lihu.jh.module.ecg.Constants.ECG_QUEUE_READY_MAX_KEY; import static cn.lihu.jh.module.ecg.enums.ErrorCodeConstants.*; /** * 排队 Service 实现类 * * @author 芋道源码 */ @Service @Validated public class QueueServiceImpl implements QueueService { @Resource private ConfigApi configApi; @Resource private queueMapper queueMapper; @Resource private RoomMapper roomMapper; AtomicInteger openingFlag = new AtomicInteger(0); AtomicInteger curSeqNum = new AtomicInteger(0); PriorityBlockingQueue priorityQueue = new PriorityBlockingQueue<>(); ConcurrentHashMap mapBedVsQueue = new ConcurrentHashMap<>(); ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); Integer queueReadyMax = 0; @Override public Integer createqueue(QueueSaveReqVO createReqVO) { // 插入 QueueDO queue = BeanUtils.toBean(createReqVO, QueueDO.class); queueMapper.insert(queue); // 返回 return queue.getId(); } @Override public void updatequeue(QueueSaveReqVO updateReqVO) { // 校验存在 validatequeueExists(updateReqVO.getId()); // 更新 QueueDO updateObj = BeanUtils.toBean(updateReqVO, QueueDO.class); queueMapper.updateById(updateObj); } @Override public void deletequeue(Integer id) { // 校验存在 validatequeueExists(id); // 删除 queueMapper.deleteById(id); } @Override public ErrorCode startBedOpen(Long roomId, String bedNo) { Future future = singleThreadExecutor.submit( new BedOpenCallable(this, roomId, bedNo)); try { ErrorCode ret = future.get(); return ret; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return ECG_INNER_ERROR; } @Override public ErrorCode startBedClose(Long roomId, String bedNo) { Future future = singleThreadExecutor.submit( new BedCloseCallable(this, roomId, bedNo)); try { ErrorCode ret = future.get(); return ret; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return ECG_INNER_ERROR; } @Override public ErrorCode startBedDoctorPause(Long roomId, String bedNo, Long docId, String docName) { Future future = singleThreadExecutor.submit( new BedDoctorPauseCallable(this, roomId, bedNo, docId, docName) ); try { ErrorCode ret = future.get(); return ret; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return ECG_INNER_ERROR; } @Override public ErrorCode startBedDoctorResume(Long roomId, String bedNo, Long docId, String docName) { Future future = singleThreadExecutor.submit( new BedDoctorResumeCallable(this, roomId, bedNo, docId, docName) ); try { ErrorCode ret = future.get(); return ret; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return ECG_INNER_ERROR; } @Override public ErrorCode startBedDoctorOn(Long roomId, String bedNo, Long docId, String docName) { Future future = singleThreadExecutor.submit( new BedDoctorOnCallable(this, roomId, bedNo, docId, docName) ); try { ErrorCode ret = future.get(); return ret; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return ECG_INNER_ERROR; } @Override public ErrorCode startBedDoctorOff(Long roomId, String bedNo, Long docId, String docName) { Future future = singleThreadExecutor.submit( new BedDoctorOffCallable(this, roomId, bedNo, docId, docName) ); try { ErrorCode ret = future.get(); return ret; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return ECG_INNER_ERROR; } /** * 已关闭 或者 关闭中,可以开通工位 * @param roomId * @param bedNo * @return */ @Override public ErrorCode bedOpen(Long roomId, String bedNo) { // DB update List statusList = new ArrayList(); statusList.add(BedStatusEnum.CLOSED); Integer updateNum = roomMapper.setBedStatus(roomId, bedNo, BedStatusEnum.OPENING, statusList); if ( null==updateNum || 0 == updateNum ) return ROOM_INVALID_STATUS; List queueStatusList = new ArrayList<>(); queueStatusList.add(QueueStatusEnum.READY.getStatus()); List queueDOList = queueMapper.getDoctorQueueByStatus(roomId, bedNo, queueStatusList); // 新增工位 优先队列 BedQueueBO bedQueueBO = new BedQueueBO(); bedQueueBO.setRoomId(roomId); bedQueueBO.setBedNo(bedNo); bedQueueBO.setMaxQueueNum(queueReadyMax); bedQueueBO.setQueueNum(new AtomicInteger(queueDOList.size())); bedQueueBO.setStatusEnum(BedStatusEnum.OPENING); priorityQueue.offer(bedQueueBO); mapBedVsQueue.put(String.format("%09d%s", roomId, bedNo), bedQueueBO); return SUCCESS; } @Override public ErrorCode bedClose(Long roomId, String bedNo) { BedQueueBO bedQueueBO = mapBedVsQueue.get(String.format("%09d%s", roomId, bedNo)); if (null == bedQueueBO) return QUEUE_BED_ABNORMAL; if (bedQueueBO.getQueueNum().get() >0) return QUEUE_HAVE_PATIENT; BedStatusEnum destStatusEnum = destStatusEnum = BedStatusEnum.CLOSED; mapBedVsQueue.remove(String.format("%09d%s", roomId, bedNo)); priorityQueue.remove(bedQueueBO); // DB update List statusList = new ArrayList(); statusList.add(BedStatusEnum.OPENING); statusList.add(BedStatusEnum.DOCTOR_ON); statusList.add(BedStatusEnum.PAUSE); Integer updateNum = roomMapper.setBedStatus(roomId, bedNo, destStatusEnum, statusList); if ( null==updateNum || 0 == updateNum ) return ROOM_INVALID_STATUS; return SUCCESS; } @Override public ErrorCode bedDoctorPause(Long roomId, String bedNo, Long docId, String docName) { BedQueueBO bedQueueBO = mapBedVsQueue.get(String.format("%09d%s", roomId, bedNo)); if (null == bedQueueBO) return QUEUE_BED_ABNORMAL; bedQueueBO.setStatusEnum(BedStatusEnum.PAUSE); priorityQueue.remove(bedQueueBO); // DB update List statusList = new ArrayList(); statusList.add(BedStatusEnum.DOCTOR_ON); Integer updateNum = roomMapper.setBedDoctorPause(roomId, bedNo, docId, docName, BedStatusEnum.PAUSE, statusList); if ( null==updateNum || 0 == updateNum ) return ROOM_INVALID_STATUS; return SUCCESS; } @Override public ErrorCode bedDoctorResume(Long roomId, String bedNo, Long docId, String docName) { BedQueueBO bedQueueBO = mapBedVsQueue.get(String.format("%09d%s", roomId, bedNo)); if (null == bedQueueBO) return QUEUE_BED_ABNORMAL; bedQueueBO.setStatusEnum(BedStatusEnum.DOCTOR_ON); priorityQueue.offer(bedQueueBO); // DB update List statusList = new ArrayList(); statusList.add(BedStatusEnum.DOCTOR_ON); Integer updateNum = roomMapper.setBedDoctorPause(roomId, bedNo, docId, docName, BedStatusEnum.PAUSE, statusList); if ( null==updateNum || 0 == updateNum ) return ROOM_INVALID_STATUS; return SUCCESS; } @Override public ErrorCode bedDoctorOn(Long roomId, String bedNo, Long docId, String docName) { BedQueueBO bedQueueBO = mapBedVsQueue.get(String.format("%09d%s", roomId, bedNo)); if (null == bedQueueBO) return QUEUE_BED_ABNORMAL; // DB update List statusList = new ArrayList(); statusList.add(BedStatusEnum.OPENING); Integer updateNum = roomMapper.setBedDoctorOn(roomId, bedNo, docId, docName, BedStatusEnum.DOCTOR_ON, statusList); if ( null==updateNum || 0 == updateNum ) return ROOM_INVALID_STATUS; bedQueueBO.setStatusEnum(BedStatusEnum.DOCTOR_ON); return SUCCESS; } @Override public ErrorCode bedDoctorOff(Long roomId, String bedNo, Long docId, String docName) { BedQueueBO bedQueueBO = mapBedVsQueue.get(String.format("%09d%s", roomId, bedNo)); if (null == bedQueueBO) return QUEUE_BED_ABNORMAL; // DB update List statusList = new ArrayList(); statusList.add(BedStatusEnum.DOCTOR_ON); Integer updateNum = roomMapper.setBedDoctorOff(roomId, bedNo, docId, docName, BedStatusEnum.OPENING, statusList); if ( null==updateNum || 0 == updateNum ) return ROOM_INVALID_STATUS; bedQueueBO.setStatusEnum(BedStatusEnum.OPENING); return SUCCESS; } private void validatequeueExists(Integer id) { if (queueMapper.selectById(id) == null) { throw exception(QUEUE_NOT_EXISTS); } } @Override public QueueDO getqueue(Integer id) { return queueMapper.selectById(id); } @Override public PageResult getqueuePage(QueuePageReqVO pageReqVO) { return queueMapper.selectPage(pageReqVO); } /** * !!开诊期间,不能执行这个方法,否则会有 P0 问题 * 1. 每天开诊前 从DB同步工位的患者队列数据到 工位优先队列 * 2. 服务运维重启时 */ public void initBedQueueAndSeqNumFromDB() { priorityQueue.clear(); mapBedVsQueue.clear(); // 从DB 获取 工位列表 List bedStatusEnumList = new ArrayList(); bedStatusEnumList.add(BedStatusEnum.OPENING); bedStatusEnumList.add(BedStatusEnum.DOCTOR_ON); bedStatusEnumList.add(BedStatusEnum.PAUSE); List roomDOList = roomMapper.simpleRoomList(bedStatusEnumList); List bedQueueBOList = roomDOList.stream().map(item -> BeanUtils.toBean(item, BedQueueBO.class)).toList(); // 从DB 获取 队列中 就诊准备中人员统计 列表 List queueStatusList = new ArrayList<>(); queueStatusList.add(QueueStatusEnum.READY.getStatus()); List queueStatisticDOList = queueMapper.queueStatistic(queueStatusList); bedQueueBOList.forEach(item -> { item.maxQueueNum = queueReadyMax; Optional queueStatisticDOOptional = queueStatisticDOList.stream().filter(it->it.getRoomId()==item.roomId && it.getBedNo().equals(item.getBedNo())).findFirst(); int queueNum = queueStatisticDOOptional.isPresent() ? queueStatisticDOOptional.get().getTotalInStatus() : 0; if ( queueReadyMax < queueNum ) throw new RuntimeException("init: exceed max queue number!"); item.queueNum.set( queueNum ); priorityQueue.offer(item); mapBedVsQueue.put(String.format("%09d%s", item.roomId, item.bedNo), item); }); Integer num = queueMapper.getMaxSeqNum(); curSeqNum = new AtomicInteger(null == num ? 0 : num); } /** * TODO 新开队列时,需要把排队中的人 转到 就诊准备 状态 * 等到取下一个 排队中人员 的逻辑完成后,再回来不错 */ public void hurryup() { if (0 == openingFlag.get()) return; // 处理 过号-回来 的人 for (BedQueueBO bedQueueBO : mapBedVsQueue.values()) { while (bedQueueBO.queueNum.get() < bedQueueBO.maxQueueNum) { // 查看 当前工位 是否有过号-回来的患者 Integer updateNum = queueMapper.procPassedReturnPatient( bedQueueBO.getRoomId(), bedQueueBO.getRoomName(), bedQueueBO.getBedNo(), curSeqNum.get() + 1, QueueStatusEnum.PASSED_RETURN.getStatus(), QueueStatusEnum.READY.getStatus()); if (null == updateNum || 0 == updateNum) break; curSeqNum.getAndIncrement(); // 可能已经【并发的】在 nextPatient 中改变了值 bedQueueBO.queueNum.incrementAndGet(); // 可能已经【并发的】在 nextPatient 中改变了优先队列顺序 priorityQueue.remove(bedQueueBO); priorityQueue.offer(bedQueueBO); } } // 处理 排队中 患者 while (true) { BedQueueBO bedQueueBO = priorityQueue.peek(); if (null == bedQueueBO) return; int curQueueNum = bedQueueBO.queueNum.get(); if (curQueueNum > bedQueueBO.maxQueueNum) throw new RuntimeException("hurryup: exceed max queue number!"); if (curQueueNum == bedQueueBO.maxQueueNum) return; // 查看 是否有排队中的患者 Integer updateNum = queueMapper.preemptPatient( bedQueueBO.getRoomId(), bedQueueBO.getRoomName(), bedQueueBO.getBedNo(), curSeqNum.get() + 1, QueueStatusEnum.WAITING.getStatus(), QueueStatusEnum.READY.getStatus()); // 没有抢到排队患者 if (null == updateNum || 0 == updateNum) { return; } curSeqNum.getAndIncrement(); // 可能已经【并发的】在 nextPatient 中改变了值 bedQueueBO.queueNum.incrementAndGet(); // 可能已经【并发的】在 nextPatient 中改变了优先队列顺序 priorityQueue.remove(bedQueueBO); priorityQueue.offer(bedQueueBO); } } /** * 预约确认后的排队 * @param queueSaveReqVO */ @Override public void queue(QueueSaveReqVO queueSaveReqVO) { queueSaveReqVO.setStatus(QueueStatusEnum.WAITING.getStatus()); //排队中 QueueDO queue = BeanUtils.toBean(queueSaveReqVO, QueueDO.class); queueMapper.insert(queue); if (0 == openingFlag.get()) return; startHurryUp(); } private void nextPatient(Long roomId, String bedNo) { // 从 DB 把 序号最小的 就诊准备中的人 设置为就诊中 Integer updateNum = queueMapper.updateQueueStatus(roomId, bedNo, QueueStatusEnum.READY.getStatus(), QueueStatusEnum.ONSTAGE.getStatus()); // 该工位 没有 就诊准备中 人员 if (null == updateNum || 0 == updateNum) { return; } // 优先队列中 该工位 就诊准备中人的数量 减一 BedQueueBO bo = mapBedVsQueue.get(String.format("%09d%s", roomId, bedNo)); bo.queueNum.getAndDecrement(); // 可能已经【并发的】在 hurry-up 中改变了值 priorityQueue.remove(bo); priorityQueue.offer(bo); startHurryUp(); } public void finishNextPatient(Long roomId, String bedNo) { // 从 DB 把 就诊中的人 设置为就诊完成 Integer ret = queueMapper.updateQueueStatus(roomId, bedNo, QueueStatusEnum.ONSTAGE.getStatus(), QueueStatusEnum.FINISH.getStatus()); nextPatient(roomId, bedNo); } public void passNextPatient(Long roomId, String bedNo) { // 从 DB 把 就诊中的人 设置为过号 Integer ret = queueMapper.updateQueueStatus(roomId, bedNo, QueueStatusEnum.ONSTAGE.getStatus(), QueueStatusEnum.PASSED.getStatus()); nextPatient(roomId, bedNo); } public List getDoctorQueueByStatus(Long roomId, String bedNo, List statusList) { List queueDOList = queueMapper.getDoctorQueueByStatus(roomId, bedNo, statusList); return queueDOList; } public PatientStatisticVO getPatientStatistic(Long roomId, String bedNo) { PatientStatisticVO patientStatisticVO = new PatientStatisticVO(); List bedQueueStatisticDOList = queueMapper.bedQueueStatistic(roomId, bedNo); bedQueueStatisticDOList.forEach(item -> { if (QueueStatusEnum.READY.getStatus() == item.getStatus()) { patientStatisticVO.setReadyNum(item.getTotalInStatus()); } else if (QueueStatusEnum.FINISH.getStatus() == item.getStatus()) { patientStatisticVO.setFinishedNum(item.getTotalInStatus()); } else if (QueueStatusEnum.PASSED.getStatus() == item.getStatus()) { patientStatisticVO.setPassedNum(item.getTotalInStatus()); } }); List statusList = new ArrayList<>(); statusList.add(QueueStatusEnum.WAITING.getStatus()); Integer num = queueMapper.statusStatistic(statusList); patientStatisticVO.setQueuingNum(num); return patientStatisticVO; } @Override public void setQueueReadyMax(Integer max) { queueReadyMax = max; } public void startBiz() { if (1 == openingFlag.get()) return; // 清除非当天的排队人员 queueMapper.clearQueue(); initBedQueueAndSeqNumFromDB(); openingFlag.set(1); hurryup(); } public void closeBiz() { openingFlag.set(0); } @Override public Integer recallPatient(Long roomId, String bedNo, String patId) { Integer updateNum = queueMapper.passedPatientReturn(roomId, bedNo, patId, QueueStatusEnum.PASSED.getStatus(), QueueStatusEnum.PASSED_RETURN.getStatus()); startHurryUp(); return updateNum; } @Override public Integer patientJump(String patId, Byte jumped) { Integer updateNum = queueMapper.queueJump(patId, QueueStatusEnum.WAITING.getStatus(), jumped); startHurryUp(); return updateNum; } private void startHurryUp() { singleThreadExecutor.execute( () -> { hurryup(); }); } }