eight
2024-10-24 8045900f35d49ae11e90e9155f9082d2f20ca31c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package cn.lihu.jh.framework.tenant.core.job;
 
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.exceptions.ExceptionUtil;
import cn.lihu.jh.framework.common.util.json.JsonUtils;
import cn.lihu.jh.framework.tenant.core.service.TenantFrameworkService;
import cn.lihu.jh.framework.tenant.core.util.TenantUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
 
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
 
/**
 * 多租户 JobHandler AOP
 * 任务执行时,会按照租户逐个执行 Job 的逻辑
 *
 * 注意,需要保证 JobHandler 的幂等性。因为 Job 因为某个租户执行失败重试时,之前执行成功的租户也会再次执行。
 *
 * @author 芋道源码
 */
@Aspect
@RequiredArgsConstructor
@Slf4j
public class TenantJobAspect {
 
    private final TenantFrameworkService tenantFrameworkService;
 
    @Around("@annotation(tenantJob)")
    public String around(ProceedingJoinPoint joinPoint, TenantJob tenantJob) {
        // 获得租户列表
        List<Long> tenantIds = tenantFrameworkService.getTenantIds();
        if (CollUtil.isEmpty(tenantIds)) {
            return null;
        }
 
        // 逐个租户,执行 Job
        Map<Long, String> results = new ConcurrentHashMap<>();
        tenantIds.parallelStream().forEach(tenantId -> {
            // TODO 芋艿:先通过 parallel 实现并行;1)多个租户,是一条执行日志;2)异常的情况
            TenantUtils.execute(tenantId, () -> {
                try {
                    joinPoint.proceed();
                } catch (Throwable e) {
                    results.put(tenantId, ExceptionUtil.getRootCauseMessage(e));
                }
            });
        });
        return JsonUtils.toJsonString(results);
    }
 
}