线程编排在微服务中的应用

架构小魔方2024-04-22 19:19:54  84

为什么需要线程编排?

在微服务的架构里面,不同于单体应用,它是通过领域划分的方式将业务拆成一个个具有高类聚低耦合的原子性服务,在通过前端应用进行组合形成一个统一的、可供业务使用的功能性产品,就以美团的订单管理为例

在美团的订单管理列表页面,包含订单主信息、商品、结算、配送、用户信息、骑手信息、餐损、退款、客服赔付(参照下面订单卡片截图)等,涉及到的后台服务多达几十个,如果通过传统的串行调用,可想而知它的耗时将有多长?稳定性将有多糟糕,更谈不上用户体验了,尤其在流量大的场景,直接会引起服务雪崩式塌裂。

因此基于微服务的线程编排就孕育而生,专为解决这种复杂的场景下涉及到多服务的业务组合场景,有关美团如何使用异步化的线程编排进行API组合的案例可以参考美团技术团队的文章

JAVA原生实现

先来看看JAVA 原生的线程编排如何实现,在JAVA8引入CompletabletFuture,它实际也是继承了JAVA8前版本Future的子类,让线程编排工作变得更简单。

看一个上面的例子用CompletabletFuture如何实现,其中A和B 并行执行——>流转到C——>流转到D——>并行执行E和F->执行结束。

public class ThreadJoin { public static void main(String[] args) throws Exception { //构建一个异步执行的线程池 ExecutorService executor = Executors.newFixedThreadPool(10); //a和b并行执行 CompletableFuture a = CompletableFuture.supplyAsync( -> { System.out.println("执行调用【A服务】"); return "【返回调用A的执行结果】"; }, executor); CompletableFuture b = CompletableFuture.supplyAsync( -> { System.out.println("执行调用【B服务】"); return "【返回调用B的执行结果】"; }, executor); //c等待a和b执行完后,且获取到a和b的执行结果,进行执行c CompletableFuture c = a.thenCombine(b, (result1, result2) -> { System.out.println("前两步操作结果分别为:" + result1 + result2); return "【返回调用C服务的结果】"; }); //等待c执行完后,且获取c的结果在执行d CompletableFuture d = c.thenApply((result) -> { System.out.println("c服务调用的结果 : " + result); return "【返回调用D服务的结果】"; }); //d执行完后并行异步执行e和f CompletableFuture e = d.thenApplyAsync((result) -> { System.out.println("e服务获取 d服务调用的结果 : " + result); return "【返回调用e服务的结果】"; }); CompletableFuture f = d.thenApplyAsync((result) -> { System.out.println("f服务获取 d服务调用的结果 : " + result); return "【返回调用f服务的结果】"; }); //e和f执行完后,进行结束,获得最后的执行结果。 CompletableFuture end = e.thenCombine(f, (result1, result2) -> { System.out.println("前e和f操作结果分别为:" + result1 + result2); return "结束"; }); String endResult = end.get; System.out.println("返回:"+endResult); }}

一个类似A&B->C->D-E&F->End线程编排的编码就结束,是不是非常简单,当然CompletabletFuture还有非常多有用的方法,这个大家后续可以参考一些资料进行学习。

借用开源技术组件实现

目前网上开源的常见的线程编排组件有两个,一个是京东开源的asyncTool开源地址:https://gitee.com/jd-platform-opensource/asyncTool,另一个是基于asyncTool改进的gobrs-async,开源地址:https://gitee.com/dromara/gobrs-async。由于gobrs-async目前还不太成熟,bug较多,不建议使用,本文以asyncTool为例实现上面的例子

第一步:各个节点实现IWorker和ICallback接口

public class WorkA implements IWorker, ICallback { /** * 任务开始的地方 */ @Override public void begin { } /** * 一般在这里做一些业务逻辑的处理,比如RPC的调用 * @param object object * @param allWrappers 任务包装 * @return */ @Override public String action(String object, Map allWrappers) { System.out.println("---执行A---"); return "A"; } /** * 任务执行完成的地方 * @param success * @param param * @param workResult */ @Override public void result(boolean success, String param, WorkResult workResult) { }}

IWorker接口定义一个最小的任务执行单元,通常的业务逻辑的处理均在这里面,而ICallback定义了每个work任务开始前和任务执行完之后结果情况。对应的三个方法,分别对应了任务执行前(begin)、任务执行中(action)、任务执行后(result),任务执行的每一步都尽在掌握中

第二步:定义wrapper:组合了worker和callback,是一个 最小的调度单元 。通过编排wrapper之间的关系,达到组合各个worker编排的目的

WorkerWrapper workend = new WorkerWrapper.Builder .worker(end) .callback(end) .id("end") .build; WorkerWrapper workf = new WorkerWrapper.Builder .worker(f) .callback(f) .id("f") .next(workend) .build; WorkerWrapper worke = new WorkerWrapper.Builder .worker(e) .callback(e) .next(workend) .id("e") .build; WorkerWrapper workd = new WorkerWrapper.Builder .worker(d) .callback(d) .id("d") .next(worke,workf) .build; WorkerWrapper workc = new WorkerWrapper.Builder .worker(c) .callback(c) .id("c") .next(workd) .build; WorkerWrapper workb = new WorkerWrapper.Builder .worker(b) .callback(b) .id("b") .next(workc) .build; WorkerWrapper worka = new WorkerWrapper.Builder .worker(a) .callback(a) .id("a") .next(workc) .build;

第三步:开启任务编排的执行,待任务执行结束后,获取执行结果。

Async.beginWork(3500,worka,workb);System.out.println("返回结果:"+workc.getWorkResult);Async.shutDown;

写在最后

线程编排在微服务架构中是不可或缺的组件,也是充分将业务拆分哪些环节是需要同步、哪些是可异步,通过对线程进行组合,充分利用多线程+异步的方式,缩短服务的RT耗时,提升系统的吞吐量,最终反哺用户,提升用户体验,而使用JAVA原生方式还是使用开源组件并没有优劣区别,关键在于自己的使用习惯和业务场景需要。

转载此文是出于传递更多信息目的。若来源标注错误或侵犯了您的合法权益,请与本站联系,我们将及时更正、删除、谢谢。
https://www.414w.com/read/302525.html
0
最新回复(0)