About 1 min

@Slf4j
@Component
public class AsyncManager {

    @Resource
    private Executor executorService;

    /**
     * 无返回值的异步任务提交
     *
     * @param taskFun
     * @param fn      异常补偿函数
     */
    public void doAsyncWithoutResult(Consumer<?> taskFun, Function<Throwable, ? extends Void> fn) {
        doAsyncWithResult(() -> {
            taskFun.accept(null);
            return null;
        }, fn);
    }

    /**
     * 有返回值的异步任务提交
     *
     * @param taskFun
     * @param fn
     * @param <T>
     * @return
     */
    public <T> CompletableFuture<T> doAsyncWithResult(Supplier<T> taskFun, Function<Throwable, ? extends T> fn) {
        return doAsyncWithResult(executorService, taskFun, fn, false);
    }

    /**
     * 有返回值的异步任务提交
     *
     * @param taskFun 业务函数
     * @param fn      业务异常补偿函数
     * @param isThrow 是否向上层抛出异常
     * @param <T>
     * @return
     */
    public <T> CompletableFuture<T> doAsyncWithResult(Executor executorParam, Supplier<T> taskFun, Function<Throwable, ? extends T> fn, final boolean isThrow) {
        final String threadName = ThreadUtil.getThreadName();
        final String headerStr = HeaderThreadLocal.getHeader();
        CompletableFuture<T> f = CompletableFuture.supplyAsync(() -> {
            log.info("异步操作start");
            threadInit(threadName, headerStr);
            try {
                return taskFun.get();
            } catch (Throwable throwable) {
                log.error("异步操作发生异常: ", throwable);
                T result = null;
                if (fn != null) {
                    result = fn.apply(throwable);
                }
                if (isThrow) {
                    // CompletionException类型的异常不会被CompletableFuture包装
                    throw new CompletionException(throwable);
                } else {
                    return result;
                }
            } finally {
                log.info("异步操作end");
                threadClean();
            }
        }, executorParam);
        return f;
    }

    private void threadClean() {
        ThreadUtil.clearMDC();
        HeaderThreadLocal.clean();
    }

    private void threadInit(String threadName, String headerStr) {
        ThreadUtil.initThreadName(threadName + "-sync");
        HeaderThreadLocal.setHeader(headerStr);
    }

    /**
     * 有返回值的异步任务提交
     *
     * @param taskFun
     * @param <T>
     * @return
     */
    public <T> CompletableFuture<T> doAsyncWithResultWithException(Supplier<T> taskFun, String channelCode) {
        return doAsyncWithResultWithException(executorService, taskFun, channelCode);
    }

    /**
     * 有返回值的异步任务提交
     *
     * @param taskFun 业务函数
     * @param <T>
     * @return
     */
    public <T> CompletableFuture<T> doAsyncWithResultWithException(Executor executorParam, Supplier<T> taskFun, String channelCode) {
        final String threadName = ThreadUtil.getThreadName();
        final String headerStr = HeaderThreadLocal.getHeader();
        CompletableFuture<T> f = CompletableFuture.supplyAsync(() -> {
            log.info("异步执行开始");
            threadInit(threadName, headerStr, channelCode);
            try {
                return taskFun.get();
            } catch (Throwable throwable) {
                log.warn("异步执行发生异常: ", throwable);
                // CompletionException类型的异常不会被CompletableFuture包装
                throw new CompletionException(throwable);
            } finally {
                log.info("异步执行结束");
                threadClean();
            }
        }, executorParam);
        return f;
    }

    private void threadInit(String threadName, String headerStr, String channelCode) {
        ThreadUtil.initThreadName(threadName + "-" + channelCode + "-sync");
        HeaderThreadLocal.setHeader(headerStr);
    }
}

Last update:
Contributors: gaoqisen,gaoqisen