package cn.lihu.jh.framework.tenant.core.job; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.exceptions.ExceptionUtil; import cn.lihu.jh.framework.common.util.json.JsonUtils; import cn.lihu.jh.framework.tenant.core.service.TenantFrameworkService; import cn.lihu.jh.framework.tenant.core.util.TenantUtils; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 多租户 JobHandler AOP * 任务执行时,会按照租户逐个执行 Job 的逻辑 * * 注意,需要保证 JobHandler 的幂等性。因为 Job 因为某个租户执行失败重试时,之前执行成功的租户也会再次执行。 * * @author 芋道源码 */ @Aspect @RequiredArgsConstructor @Slf4j public class TenantJobAspect { private final TenantFrameworkService tenantFrameworkService; @Around("@annotation(tenantJob)") public String around(ProceedingJoinPoint joinPoint, TenantJob tenantJob) { // 获得租户列表 List tenantIds = tenantFrameworkService.getTenantIds(); if (CollUtil.isEmpty(tenantIds)) { return null; } // 逐个租户,执行 Job Map results = new ConcurrentHashMap<>(); tenantIds.parallelStream().forEach(tenantId -> { // TODO 芋艿:先通过 parallel 实现并行;1)多个租户,是一条执行日志;2)异常的情况 TenantUtils.execute(tenantId, () -> { try { joinPoint.proceed(); } catch (Throwable e) { results.put(tenantId, ExceptionUtil.getRootCauseMessage(e)); } }); }); return JsonUtils.toJsonString(results); } }