Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

collapse executor 是一个高性能、低延迟的批量合并执行器,可有效支持高并发的热点请求,支持与Spring Boot集成,帮助开发者快速构建高性能的微服务,提高服务资源利用率的同时降低服务响应时间。涉及技术点包括CompletableFuture、Spring Boot、WebClient、Servlet Async等主流技术栈

License

Notifications You must be signed in to change notification settings

icodening/collapse-executor

Repository files navigation

项目简介

collapse executor 是一个高性能、低延迟的批量执行器,可有效支持高并发的热点请求,支持与Spring Boot集成,帮助开发者快速构建高性能的微服务,提高服务资源利用率的同时降低服务响应时间。涉及技术点包括CompletableFuture、Spring Boot、WebClient、Servlet Async等主流技术栈

适用场景

  1. 部分接口的并发量很高,想提升性能的同时又不希望引入缓存服务,减少额外维护成本。
  2. 单次调用转批量调用。在同一时刻将不同线程的参数合并为一个参数再执行批处理逻辑。
  3. 希望降低客户端的远程连接数量。
  4. 希望减少服务端工作线程消耗。
  5. 批量获取基于DB/redis的自增序列

项目亮点

  1. API简单易上手,扩展难度低

默认提供了spring-boot集成,可在spring-boot环境下开箱即用。且核心逻辑已高度抽象,二次扩展实现简单,仅需编写合并请求以及拆分响应两块逻辑。

  1. 高性能0延迟,发起批量请求时无需等待时间窗口

巧妙的利用了提交任务->任务执行两个行为之间的时间间隔进行输入的批量收集,相比于等待一个时间窗口(如等待2ms) 的设计,该设计可在批处理下依然保证极高的实时性,且在高并发的场景下也有着不俗的批量收集能力。

  1. 设计简单易维护,无需维护第三方服务

核心逻辑完全不依赖任何第三方库/服务,完全基于JDK库进行实现。

流程对比

以下两张图解释了有无折叠执行器的调用差异。当无请求折叠时,请求与网络连接数的比例为1:1;当使用请求折叠后,请求与网络连接数的比例为N:1,即多个请求会合并为一个请求发起远程调用,由此可以做到减少I/O次数、减少后端压力,从而提升调用性能降低RT。

无请求折叠

有请求折叠

快速开始

必备条件: JDK8及以上

一.入门: 自动折叠及拆分

该方式适用于简单的幂等请求的场景,通常需要用户手动指定本次调用所属的并发分组。

以下该案例表示将当前传入的Callable按照 example group 进行分组。 同一并发分组下的Callable仅执行一次,并将这一次的返回结果作为同一并发分组发起的请求结果

1.同步阻塞调用

BlockingCollapseExecutorExample.java

public class BlockingCollapseExecutorExample {
 public static void main(String[] args) throws Throwable {
 BlockingCallableGroupCollapseExecutor blockingCollapseExecutor = new BlockingCallableGroupCollapseExecutor();
 String outputString = blockingCollapseExecutor.execute("example group", () -> "Hello World Collapse Executor. Blocking");
 System.out.println(outputString);
 }
}

2.异步调用

AsyncCollapseExecutorExample.java

public class AsyncCollapseExecutorExample {
 public static void main(String[] args) throws Throwable {
 AsyncCallableGroupCollapseExecutor asyncCallableGroupCollapseExecutor = new AsyncCallableGroupCollapseExecutor();
 asyncCallableGroupCollapseExecutor.setExecutor(new ThreadPoolExecutor(10, 10, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), r -> {
 Thread thread = new Thread(r);
 thread.setDaemon(true);
 return thread;
 }));
 asyncCallableGroupCollapseExecutor.execute("example group", () -> "Hello World Collapse Executor. Async")
 .thenAccept(System.out::println)
 .thenRun(() -> System.exit(0));
 System.in.read();
 }
}

3.非阻塞异步调用

FutureCollapseExecutorExample.java

这种方式必须保证Callable中的处理逻辑是非阻塞的!!!

public class FutureCollapseExecutorExample {
 public static void main(String[] args) throws Throwable {
 FutureCallableGroupCollapseExecutor futureCollapseExecutor = new FutureCallableGroupCollapseExecutor();
 futureCollapseExecutor.execute("example group", () -> CompletableFuture.completedFuture("Hello World Collapse Executor. Future"))
 .thenAccept(System.out::println)
 .thenRun(() -> System.exit(0));
 System.in.read();
 }
}

二. 进阶: 手动折叠及拆分

该方式适用于后端服务提供了批处理接口的场景,将同并发下其他线程的输入合并调用后端服务的批处理接口,可以减少多次不必要的单次调用,如批量查询。
由于这种方式可以更好的处理输入组,故该方式合并效率可以更高,由此带来的性能提升也会更高。

1.同步阻塞调用

CustomBlockingCollapseExecutor.java

public class CustomBlockingCollapseExecutor extends CollapseExecutorBlockingSupport<Long, UserEntity, Map<Long, UserEntity>> {
 @Override
 protected Map<Long, UserEntity> doExecute(Collection<Input<Long>> inputs) {
 //此处编写批量请求逻辑
 return null;
 }
 @Override
 protected void bindingOutput(Map<Long, UserEntity> users, List<Bundle<Long, UserEntity>> bundles) {
 //此处编写批量响应与原始请求关联的逻辑
 }
}

2.异步调用

CustomAsyncCollapseExecutor.java
与同步阻塞调用类似,主要差异为需要设置一个异步线程池,用于执行批量请求逻辑。

public class CustomBlockingCollapseExecutor extends CollapseExecutorAsyncSupport<Long, UserEntity, Map<Long, UserEntity>> {
 @Override
 protected Map<Long, UserEntity> doExecute(Collection<Input<Long>> inputs) {
 //此处编写批量请求逻辑
 return null;
 }
 @Override
 protected void bindingOutput(Map<Long, UserEntity> users, List<Bundle<Long, CompletableFuture<UserEntity>>> bundles) {
 for (Bundle<Long, CompletableFuture<UserEntity>> bundle : bundles) {
 Long inputId = bundle.getInput();
 UserEntity userEntity = users.get(inputId);
 //需要返回CompletableFuture类型
 bundle.bindOutput(CompletableFuture.completedFuture(userEntity));
 }
 }
}

三. Spring Boot集成

collapse executor已经对Spring Boot进行了适配,利用Spring Boot自动装配能力简化了在Spring环境下的使用体验,通过简单的配置即可使用。
在Spring Boot中,collapse executor对主要的几个组件进行了适配,分别是 RestTemplateWebClientServlet

1. application.yaml配置解释

以下是一个折叠执行器的yaml配置例子及解释,详情可参考collapse-executor-sample-spring-boot中的application.yaml

collapse:
 executor:
 enabled: true # 折叠执行器的总开关,配置为false后,后面的所有配置(servlet、rest-template、web-client)将失效
 wait-threshold: 10 #批量收集的最小阈值
 collecting-wait-time: 0 #声明批量收集未达到阈值时的行为。
 #collecting-wait-time < 0时:不做任何等待,立即发起调用
 #collecting-wait-time = 0时:让出当前收集线程时间片等待下次调度后再发起调用
 #collecting-wait-time > 0时:等待指定的时间后再发起调用,单位为毫秒(ms)
 rest-template:
 enabled: true #true表示打开RestTemplate的合并拦截器
 collapse-policies:
 #声明合并策略,可以配置多个
 sample-policy1: #策略名字
 collapse-request-headers: #声明需要合并的请求头名字
 - authorization
 collapse-request-queries: #声明需要合并的查询参数名字
 - sample
 sample-policy2:
 collapse-request-headers:
 - user-id
 collapse-request-queries:
 - sample
 collapse-groups:
 # collapse-policy-name可以省略,省略后使用默认策略仅合并path相同的请求,而忽略其他任何参数
 # 例如:此时并发发起 /user/2、/user/2、/article/2、/article/2 [4]个请求,由于前两个请求满足 /user/*,则会将前两个合并为 [1] 个请求发起调用;
 # 而第三第四个/article/2请求没有匹配到配置中的声明的折叠组,则依然会按照 [2] 个请求分别发起调用
 - uris:
 - /user/*
 - /test/noop*
 #------------------------------------------------------------------
 
 # 例如:此时并发发起 /samples/1(header:authorization=test), /samples/1(header:authorization=test), /samples/1(header:authorization=demo) [3]个请求,
 # 由于前两个请求携带的[authorization]请求头值相同,则会将前两个合并为 [1] 个请求发起调用;
 # 而第三个请求则会单独发起调用,与前两个不是同一组!
 - collapse-policy-name: sample-policy1 #需要与前面声明的策略名对应
 uris:
 - /samples/*

2.配置拦截器

@Configuration
public class SampleConfiguration {
 /**
 * 基于RestTemplate的使用方式
 */
 @Bean
 public RestTemplate restTemplate(CollapseHttpRequestInterceptor collapseHttpRequestInterceptor) {
 //注入CollapseHttpRequestInterceptor拦截器实例,并添加到RestTemplate实例上
 return new RestTemplateBuilder()
 .interceptors(collapseHttpRequestInterceptor)
 .build();
 }
 /**
 * 基于WebClient的使用方式
 */
 @Bean
 public WebClient webClient(CollapseExchangeFilterFunction exchangeFilterFunction) {
 //注入CollapseExchangeFilterFunction过滤器实例,并添加到WebClient实例上
 return WebClient.builder().filter(exchangeFilterFunction).build();
 }
}

3. 启动SpringBootSampleApplication查看结果

业务逻辑位于 AbstractBlockingCallSample

public abstract class AbstractBlockingCallSample {
 //已省略前后无关代码
 @EventListener(ApplicationReadyEvent.class)
 public void processOnStarted() {
 UriComponentsBuilder baseUriBuilder = UriComponentsBuilder.fromUriString("http://localhost").port(serverPort);
 try {
 System.out.println("--------------------------------[" + prefix + "] start----------------------------------");
 //查询id为1-50之间的所有用户,预期打印结果为执行了50次
 queryId1between50(executorService, baseUriBuilder);
 //批处理查询id为1-50之间的所有用户,预期打印结果小于50次
 queryId1between50Batch();
 //单条查询id为1-2之间的所有用户,预期打印结果小于50次
 queryId1between2(executorService, baseUriBuilder);
 System.out.println("--------------------------------[" + prefix + "] end------------------------------------\n\n\n");
 } catch (InterruptedException e) {
 throw new RuntimeException(e);
 }
 }
}

性能对比

一、Servlet

结论:相比于未使用折叠,TPS提升高达94%

服务参数
server.tomcat.threads.max=200
服务地址(位于 collapse-executor-samples 中的 StressTestController, 后端均延迟[100ms]后响应,逻辑一致无差异)
http://localhost:8080/test/collapse100
http://localhost:8080/test/noop100
测试参数
400用户线程数,持续压测5分钟

开启折叠

http://localhost:8080/test/collapse100 启用请求折叠测试结果
TPS 3785/s
RT99 115ms

with-collapse

关闭折叠

http://localhost:8080/test/noop100 关闭请求折叠测试结果
TPS 1951/s
RT99 211ms

without-collapse

二、RestTemplate

结论:相比于未使用折叠,TPS提升高达80%

RestTemplate默认配置,差异仅为是否包含折叠执行拦截器(CollapseHttpRequestInterceptor)
RestTemplate调用地址http://localhost:8080/test/noop0
200用户线程数,持续压测1分钟

开启折叠

启用请求折叠测试结果
TPS 15282/s
RT99 59ms

with-collapse

关闭折叠

启用请求折叠测试结果
TPS 8455/s
RT99 180ms

without-collapse

三、WebClient

结论:相比于未使用折叠,TPS提升高达43%

WebClient默认配置,差异仅为是否包含折叠执行拦截器(CollapseExchangeFilterFunction)
WebClient调用地址http://localhost:8080/test/noop0
200用户线程数,持续压测1分钟

开启折叠

启用请求折叠测试结果
TPS 14276/s
RT99 35ms

with-collapse

关闭折叠

启用请求折叠测试结果
TPS 9971/s
RT99 111ms

without-collapse

工作流程

处理流程

About

collapse executor 是一个高性能、低延迟的批量合并执行器,可有效支持高并发的热点请求,支持与Spring Boot集成,帮助开发者快速构建高性能的微服务,提高服务资源利用率的同时降低服务响应时间。涉及技术点包括CompletableFuture、Spring Boot、WebClient、Servlet Async等主流技术栈

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

AltStyle によって変換されたページ (->オリジナル) /