collapse executor 是一个高性能、低延迟的批量执行器,可有效支持高并发的热点请求,支持与Spring Boot集成,帮助开发者快速构建高性能的微服务,提高服务资源利用率的同时降低服务响应时间。涉及技术点包括CompletableFuture、Spring Boot、WebClient、Servlet Async等主流技术栈
- 部分接口的并发量很高,想提升性能的同时又不希望引入缓存服务,减少额外维护成本。
- 单次调用转批量调用。在同一时刻将不同线程的参数合并为一个参数再执行批处理逻辑。
- 希望降低客户端的远程连接数量。
- 希望减少服务端工作线程消耗。
- 批量获取基于DB/redis的自增序列
- API简单易上手,扩展难度低
默认提供了
spring-boot集成,可在spring-boot环境下开箱即用。且核心逻辑已高度抽象,二次扩展实现简单,仅需编写合并请求以及拆分响应两块逻辑。
- 高性能0延迟,发起批量请求时无需等待时间窗口
巧妙的利用了
提交任务->任务执行两个行为之间的时间间隔进行输入的批量收集,相比于等待一个时间窗口(如等待2ms)的设计,该设计可在批处理下依然保证极高的实时性,且在高并发的场景下也有着不俗的批量收集能力。
- 设计简单易维护,无需维护第三方服务
核心逻辑完全不依赖任何第三方库/服务,完全基于JDK库进行实现。
以下两张图解释了有无折叠执行器的调用差异。当无请求折叠时,请求与网络连接数的比例为1:1;当使用请求折叠后,请求与网络连接数的比例为N:1,即多个请求会合并为一个请求发起远程调用,由此可以做到减少I/O次数、减少后端压力,从而提升调用性能降低RT。
必备条件: JDK8及以上
该方式适用于简单的幂等请求的场景,通常需要用户手动指定本次调用所属的并发分组。
以下该案例表示将当前传入的Callable按照
example group进行分组。 同一并发分组下的Callable仅执行一次,并将这一次的返回结果作为同一并发分组发起的请求结果
BlockingCollapseExecutorExample.java
public class BlockingCollapseExecutorExample { public static void main(String[] args) throws Throwable { BlockingCallableGroupCollapseExecutor blockingCollapseExecutor = new BlockingCallableGroupCollapseExecutor(); String outputString = blockingCollapseExecutor.execute("example group", () -> "Hello World Collapse Executor. Blocking"); System.out.println(outputString); } }
AsyncCollapseExecutorExample.java
public class AsyncCollapseExecutorExample { public static void main(String[] args) throws Throwable { AsyncCallableGroupCollapseExecutor asyncCallableGroupCollapseExecutor = new AsyncCallableGroupCollapseExecutor(); asyncCallableGroupCollapseExecutor.setExecutor(new ThreadPoolExecutor(10, 10, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), r -> { Thread thread = new Thread(r); thread.setDaemon(true); return thread; })); asyncCallableGroupCollapseExecutor.execute("example group", () -> "Hello World Collapse Executor. Async") .thenAccept(System.out::println) .thenRun(() -> System.exit(0)); System.in.read(); } }
FutureCollapseExecutorExample.java
这种方式必须保证Callable中的处理逻辑是非阻塞的!!!
public class FutureCollapseExecutorExample { public static void main(String[] args) throws Throwable { FutureCallableGroupCollapseExecutor futureCollapseExecutor = new FutureCallableGroupCollapseExecutor(); futureCollapseExecutor.execute("example group", () -> CompletableFuture.completedFuture("Hello World Collapse Executor. Future")) .thenAccept(System.out::println) .thenRun(() -> System.exit(0)); System.in.read(); } }
该方式适用于后端服务提供了批处理接口的场景,将同并发下其他线程的输入合并调用后端服务的批处理接口,可以减少多次不必要的单次调用,如批量查询。
由于这种方式可以更好的处理输入组,故该方式合并效率可以更高,由此带来的性能提升也会更高。
CustomBlockingCollapseExecutor.java
public class CustomBlockingCollapseExecutor extends CollapseExecutorBlockingSupport<Long, UserEntity, Map<Long, UserEntity>> { @Override protected Map<Long, UserEntity> doExecute(Collection<Input<Long>> inputs) { //此处编写批量请求逻辑 return null; } @Override protected void bindingOutput(Map<Long, UserEntity> users, List<Bundle<Long, UserEntity>> bundles) { //此处编写批量响应与原始请求关联的逻辑 } }
CustomAsyncCollapseExecutor.java
与同步阻塞调用类似,主要差异为需要设置一个异步线程池,用于执行批量请求逻辑。
public class CustomBlockingCollapseExecutor extends CollapseExecutorAsyncSupport<Long, UserEntity, Map<Long, UserEntity>> { @Override protected Map<Long, UserEntity> doExecute(Collection<Input<Long>> inputs) { //此处编写批量请求逻辑 return null; } @Override protected void bindingOutput(Map<Long, UserEntity> users, List<Bundle<Long, CompletableFuture<UserEntity>>> bundles) { for (Bundle<Long, CompletableFuture<UserEntity>> bundle : bundles) { Long inputId = bundle.getInput(); UserEntity userEntity = users.get(inputId); //需要返回CompletableFuture类型 bundle.bindOutput(CompletableFuture.completedFuture(userEntity)); } } }
collapse executor已经对Spring Boot进行了适配,利用Spring Boot自动装配能力简化了在Spring环境下的使用体验,通过简单的配置即可使用。
在Spring Boot中,collapse executor对主要的几个组件进行了适配,分别是
RestTemplate
、WebClient
、Servlet。
以下是一个折叠执行器的yaml配置例子及解释,详情可参考collapse-executor-sample-spring-boot中的application.yaml
collapse: executor: enabled: true # 折叠执行器的总开关,配置为false后,后面的所有配置(servlet、rest-template、web-client)将失效 wait-threshold: 10 #批量收集的最小阈值 collecting-wait-time: 0 #声明批量收集未达到阈值时的行为。 #collecting-wait-time < 0时:不做任何等待,立即发起调用 #collecting-wait-time = 0时:让出当前收集线程时间片等待下次调度后再发起调用 #collecting-wait-time > 0时:等待指定的时间后再发起调用,单位为毫秒(ms) rest-template: enabled: true #true表示打开RestTemplate的合并拦截器 collapse-policies: #声明合并策略,可以配置多个 sample-policy1: #策略名字 collapse-request-headers: #声明需要合并的请求头名字 - authorization collapse-request-queries: #声明需要合并的查询参数名字 - sample sample-policy2: collapse-request-headers: - user-id collapse-request-queries: - sample collapse-groups: # collapse-policy-name可以省略,省略后使用默认策略仅合并path相同的请求,而忽略其他任何参数 # 例如:此时并发发起 /user/2、/user/2、/article/2、/article/2 [4]个请求,由于前两个请求满足 /user/*,则会将前两个合并为 [1] 个请求发起调用; # 而第三第四个/article/2请求没有匹配到配置中的声明的折叠组,则依然会按照 [2] 个请求分别发起调用 - uris: - /user/* - /test/noop* #------------------------------------------------------------------ # 例如:此时并发发起 /samples/1(header:authorization=test), /samples/1(header:authorization=test), /samples/1(header:authorization=demo) [3]个请求, # 由于前两个请求携带的[authorization]请求头值相同,则会将前两个合并为 [1] 个请求发起调用; # 而第三个请求则会单独发起调用,与前两个不是同一组! - collapse-policy-name: sample-policy1 #需要与前面声明的策略名对应 uris: - /samples/*
@Configuration public class SampleConfiguration { /** * 基于RestTemplate的使用方式 */ @Bean public RestTemplate restTemplate(CollapseHttpRequestInterceptor collapseHttpRequestInterceptor) { //注入CollapseHttpRequestInterceptor拦截器实例,并添加到RestTemplate实例上 return new RestTemplateBuilder() .interceptors(collapseHttpRequestInterceptor) .build(); } /** * 基于WebClient的使用方式 */ @Bean public WebClient webClient(CollapseExchangeFilterFunction exchangeFilterFunction) { //注入CollapseExchangeFilterFunction过滤器实例,并添加到WebClient实例上 return WebClient.builder().filter(exchangeFilterFunction).build(); } }
3. 启动SpringBootSampleApplication查看结果
业务逻辑位于 AbstractBlockingCallSample
public abstract class AbstractBlockingCallSample { //已省略前后无关代码 @EventListener(ApplicationReadyEvent.class) public void processOnStarted() { UriComponentsBuilder baseUriBuilder = UriComponentsBuilder.fromUriString("http://localhost").port(serverPort); try { System.out.println("--------------------------------[" + prefix + "] start----------------------------------"); //查询id为1-50之间的所有用户,预期打印结果为执行了50次 queryId1between50(executorService, baseUriBuilder); //批处理查询id为1-50之间的所有用户,预期打印结果小于50次 queryId1between50Batch(); //单条查询id为1-2之间的所有用户,预期打印结果小于50次 queryId1between2(executorService, baseUriBuilder); System.out.println("--------------------------------[" + prefix + "] end------------------------------------\n\n\n"); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
结论:相比于未使用折叠,TPS提升高达
94%
服务参数
server.tomcat.threads.max=200
服务地址(位于 collapse-executor-samples 中的 StressTestController, 后端均延迟[100ms]后响应,逻辑一致无差异)
http://localhost:8080/test/collapse100
http://localhost:8080/test/noop100
测试参数
400用户线程数,持续压测5分钟
http://localhost:8080/test/collapse100 启用请求折叠测试结果
TPS 3785/s
RT99 115ms
http://localhost:8080/test/noop100 关闭请求折叠测试结果
TPS 1951/s
RT99 211ms
结论:相比于未使用折叠,TPS提升高达
80%
RestTemplate默认配置,差异仅为是否包含折叠执行拦截器(CollapseHttpRequestInterceptor)
RestTemplate调用地址http://localhost:8080/test/noop0
200用户线程数,持续压测1分钟
启用请求折叠测试结果
TPS 15282/s
RT99 59ms
启用请求折叠测试结果
TPS 8455/s
RT99 180ms
结论:相比于未使用折叠,TPS提升高达
43%
WebClient默认配置,差异仅为是否包含折叠执行拦截器(CollapseExchangeFilterFunction)
WebClient调用地址http://localhost:8080/test/noop0
200用户线程数,持续压测1分钟
启用请求折叠测试结果
TPS 14276/s
RT99 35ms
启用请求折叠测试结果
TPS 9971/s
RT99 111ms