在实际生产中,我们无法将所有的操作都同步得到结果,典型的有两类:定时任务与耗时任务的异步操作。这篇文章讲的主要是如何处理这俩个问题

首先我所说的场景是服务端的,并且有使用java解决这些问题。有以下几中情形的异步:处理请求异步,处理请求产生的异步任务和处理请求产生的定时任务。

1.处理请求异步:首先这是一种后端异步,对于前端来说该等多久还是得等多久;这种情形发生在你知道处理该请求需要一段时间并且前端确实需要一个同步结果的情况下。比如支付,对于后端来说,用户发起支付请求我们处理请求交由支付系统处理(可能是第三方支付、可能是银行系统、可能是自己的支付系统),这个过程是耗时的,你无法第一时间将结果告知给用户,但是处理该请求你还有其他工作要处理,比如记录流水等等工作,但是对于前端来说,他只是发起了一个支付请求,并且他只要一个支付结果。你当然可以在支付后做记录流水等等工作,但是其效率是个问题,因为他们的时间是叠加的。

同步方式的样板代码:

@RequestMapping("/order")
public String order() throws Exception {
    log.info("start--------------------");
    Thread.sleep(2000);
    log.info("end--------------------");
    return "SUCCESS";
}

异步方式处理方式——Callable:

    //异步处理
    @RequestMapping("/call")
    public Callable<String>  callAsync() throws Exception {
        log.info("start--------------------");
        Callable<String> result=new Callable<String>() {
            public String call() throws Exception {
                log.info("new start--------------------");
                Thread.sleep(2000);
                log.info("new end--------------------");
                return "success";
            }
        };
        log.info("end--------------------");
        return result;
    }

注:类型参数是实际返回值
log结果:

2021-02-12 19:36:46.739  INFO 6776 --- [nio-8080-exec-3] c.b.example.controller.AsyncController   : start--------------------
2021-02-12 19:36:46.739  INFO 6776 --- [nio-8080-exec-3] c.b.example.controller.AsyncController   : end--------------------
2021-02-12 19:36:46.754  INFO 6776 --- [         task-1] c.b.example.controller.AsyncController   : new start--------------------
2021-02-12 19:36:48.755  INFO 6776 --- [         task-1] c.b.example.controller.AsyncController   : new end--------------------

异步方式处理方式——EventBus:

    @RequestMapping("/deffer")
    public DeferredResult<String>  defferAsync() throws Exception {
        log.info("start--------------------");
        String orderNumber=ThreadLocalRandom.current().nextInt(10000)+"";
        msg.setStart(orderNumber);
        DeferredResult<String> result=new DeferredResult<>();
        defferentResultHandler.getMap().put(orderNumber, result);
        log.info("end--------------------");
        return result;
    }

注:类型参数是实际返回值,这是Spring的EventBus处理的,就是个观察者模式,在下文会用到。

@Component
public class MockMsg {
    private String start;
    private String end;
    private Logger log=LoggerFactory.getLogger(getClass());
    public String getStart() {
        return start;
    }
    public void setStart(String start) {
        new Thread(()->{
            log.info("------------------下单--------------------------");
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            this.end = start;
            log.info("------------------下单结束--------------------------");
        }).start();
    }
    public String getEnd() {
        return end;
    }
    public void setEnd(String end) {
        this.end = end;
    }
}
@Component
public class DefferentResultHandler {
    private Map<String,DeferredResult<String>> map=new HashMap<>();

    public Map<String, DeferredResult<String>> getMap() {
        return map;
    }

    public void setMap(Map<String, DeferredResult<String>> map) {
        this.map = map;
    }
}
@Component
public class MsgListener implements ApplicationListener<ContextRefreshedEvent> {
    @Autowired
    private MockMsg msg;
    @Autowired
    private DefferentResultHandler defferentResultHandler;
    private Logger log=LoggerFactory.getLogger(getClass());
    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        new Thread(()->{
            while(true) {
                if(msg.getEnd() != null) {
                    String num=msg.getEnd();
                    log.info("监听器-------------------------------------");
                    defferentResultHandler.getMap().get(num).setResult("这是处理结果===================");
                    msg.setEnd(null);
                }else {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            }
        }).start();        
    }
}

或者:

    @Async
    @EventListener(LoginEvent.class)
    public void handleAsync(ContextRefreshedEvent event) {
        // handle event 
    }

可以看出实现逻辑是非常清晰的,就是在处理完成时(标志是end有值),将结果放进去。但是我不推荐你使用这样的实现方式,十分啰嗦。

注:
a.上文的例子是打个比方,在实际生产中我们发起的支付与支付结果的获取不一定是同步的,一般是这么个流程,你发起支付,支付系统会返回一个支付流水号(或者订单号),你可以根据该ID查询结果。通常为了反正重复支付你应该存起来且与你的订单号关联起来。
b.Servlet3.0有一种处理处理异步的方式,如下:

@WebServlet(value = "/async", asyncSupported = true) // true 就可以支持异步了
public class HelloAsyncServlet extends HttpServlet {
    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        
        System.out.println("主线程...."+Thread.currentThread());
        AsyncContext startAsync = req.startAsync();
    // 业务逻辑进行异步处理;开启异步处理
        startAsync.start(new Runnable() {

            @Override
            public void run() {
                try {
                    System.out.println("副线程开启...."+Thread.currentThread()+Thread.currentThread()+"==>"+System.currentTimeMillis());
                    sayHello();
                    //获取到异步的上下文
                    AsyncContext asyncContext = req.getAsyncContext();
                    // 获取响应
                    ServletResponse response = asyncContext.getResponse(); //获取交出去的响应
                    response.getWriter().write("hello toov5 async");
                    startAsync.complete();  //异步调用完毕 开始给予响应
                    System.out.println("副线程结束...."+Thread.currentThread()+"==>"+System.currentTimeMillis());
                } catch (InterruptedException | IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
       System.out.println("主线程结束..."+Thread.currentThread()+"==>"+System.currentTimeMillis());
    }
    public void sayHello() throws InterruptedException {
        System.out.println(Thread.currentThread() + "processing...");
        Thread.sleep(3000);
    }
}

但是我不推荐你使用这种方式。一样啰嗦。但是他有一个优势response可以重复响应数据不会使用一次后关闭。

2.异步任务:这是一种处理请求再取请求结果的处理方式。最常用在批量处理这样的操作上面,批量处理一次性的量太大了,如果一次性告诉每个项的处理结果那必然会等很长时间。而处理这样的问题的一般方案是:将请求中要处理项目存到MQ,MQ消费处理后将结果存表。
消费消息:

@Service
public class TestService {
    private Logger log=LoggerFactory.getLogger(getClass());
    @Scheduled(cron = "0/5 * * * * ? ")
    public void testTask() {
                //消费,处理,存库
        log.info("task running ×××××");
    }
}

3.定时任务:这种情况一般是用户发起了一个定时操作,比如每个月一号为充话费,每天几点几点发告警通知等等。这种情况的实现实现思路和第二中情况是一致的:将任务存起来然后消费该任务,最后将存库。
消费任务:

@Service
public class TestService {
    private Logger log=LoggerFactory.getLogger(getClass());
    @Scheduled(cron = "0/5 * * * * ? ")
    public void testTask() {
                //处理,存库
        log.info("task running ×××××");
    }
}

注:具体实现后期有时间补充上去。如果有建议或意见请评论。转载请声明作者与出处。

标签: 解决方案

添加新评论