开源 企业版 高校版 私有云 模力方舟 AI 队友
代码拉取完成,页面将自动刷新
捐赠
捐赠前请先登录
扫描微信二维码支付
取消
支付完成
支付提示
将跳转至支付宝完成支付
确定
取消
4 Star 4 Fork 3

阿啄debugIT/batchThreadTask

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
已有帐号? 立即登录
阿啄debugIT
分支 (1)
阿啄debugIT
克隆/下载
克隆/下载
提示
下载代码请复制以下命令到终端执行
为确保你提交的代码身份被 Gitee 正确识别,请执行以下命令完成配置
初次使用 SSH 协议进行代码克隆、推送等操作时,需按下述提示完成 SSH 配置
1 生成 RSA 密钥
2 获取 RSA 公钥内容,并配置到 SSH公钥
在 Gitee 上使用 SVN,请访问 使用指南
使用 HTTPS 协议时,命令行会出现如下账号密码验证步骤。基于安全考虑,Gitee 建议 配置并使用私人令牌 替代登录密码进行克隆、推送等操作
Username for 'https://gitee.com': userName
Password for 'https://userName@gitee.com': # 私人令牌
贡献代码
同步代码
对比差异 通过 Pull Request 同步
同步更新到分支
通过 Pull Request 同步
将会在向当前分支创建一个 Pull
Request,合入后将完成同步
提示: 由于 Git 不支持空文件夾,创建文件夹后会生成空的 .keep 文件
Loading...
README
未知许可证
# batchThreadTask #### 介绍 batchThreadTask一个适合绝大多数场景下的批量任务线程池 #### 软件架构 软件架构说明 在工作中的一个场景中,需要紧急处理五千多份合同,合同处理的过程有些复杂,这里说的复杂不是代码复杂,是中间需要经过多个服务渠道,包括对外对接的一些业务,所以这五千来分如果同步处理的话,估计要跑上估计至少也得半天的时间了,而后期确定了还会面临同样的问题(坑爹的代码与凌乱的架构问题),因此写了一个处理合同的线程池,越写兴趣越浓,最后写出来以后发现这个鸟玩意儿实在是太通用了,几乎可以用在所有场景下的批量任务。 #### 使用说明 这个线程池可以说是为批量任务量身定做的一套方案,并且几乎可以实现任何场景下的批量任务。大体分为4个部分: 约束者 执行者 管理者 发起者 #### 安装教程 约束者主要是对整个线程执行类的一个约束,他定义了公共的一致的接口,方便其他角色调度,我给他起名为ThreadTask的一个借口类,以下是代码,其中T表示要处理的数据类型: ``` package com.cnpany.common.util.thread; /** * 多线程任务 * @author 郭胜凯 * @time 2017年10月28日 上午11:28:23 * @email guoshengkai@shoujinwang.com */ public interface ThreadTask<T> { /** * 开始线程 */ void start(); /** * 结束线程 */ void stop(); /** * 获得任务数 * @return */ int getTaskCount(); /** * 获得线程数 * @return */ int getThreadCount(); /** * 等待完成 * @return * 等待时间 */ long doWait(); } ``` 执行者 执行者类似一个员工的角色,他负责执行整个批量任务的实现,我更喜欢叫他机器人所以,他也是一个接口类,这个类只有一个excute方法,下面是这个"机器人"的具约束码,其中T表示要处理的数据类型,K表示在处理过程中需要用到的对象。因为我需要对数据库以及其他的一些远程业务进行操作,所以我需要把Service交给这个机器人,让机器人用这个Service去做一些事情。 ``` package com.cnpany.common.util.thread; import java.util.List; /** * 线程工作者 * @author 郭胜凯 * @time 2017年10月28日 上午11:25:05 * @email guoshengkai@shoujinwang.com */ public interface ThreadRobot<T, K> { /** * 工作函数 * @param t */ void excute(List<T> queue, K param); } ``` 管理者 这里的管理者,是针对Thread Task这个接口的实现,我给他起名为BatchThreadTask,也就是说,这个管理者承担一个批处理任务的管理工作,当然,根据不同的业务需求,也可以创建更多的管理者,不签这个批处理的角色几乎可以实现绝大多数业务的批量操作了,下面是具体代码: ``` package com.cnpany.common.util.thread.impl; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.locks.Lock; import com.cnpany.common.util.thread.ThreadRobot; import com.cnpany.common.util.thread.ThreadTask; import com.cnpany.common.util.thread.exception.NotStartedException; import com.cnpany.common.util.thread.exception.NullTaskException; /** * 批量线程任务 * @author 郭胜凯 * @time 2017年10月28日 上午11:33:05 * @email guoshengkai@shoujinwang.com */ public class BatchThreadTask<T, K> implements ThreadTask<T> { /** * 消费内容 */ private LinkedList<T> param = new LinkedList<>(); /** * 最大线程数 */ private int maxThread = 3; /** * 当前线程数 */ private int thisThread = 0; /** * 执行机器人 */ private ThreadRobot<T, K> roboot; private long startTime = 0; private K member = null; private Lock lock = null; private boolean over = false; /** * 创建批量任务 * @param roboot * 任务执行机器人 * @param maxThread * 最大机器人数量 * @param param * 消费内容 */ public BatchThreadTask(ThreadRobot<T, K> roboot, int maxThread, List<T> param, Lock lock, K member) { if(null == param || param.isEmpty()) { throw new NullTaskException(); } if(maxThread > 0) { this.maxThread = maxThread; } for (T t : param) { this.param.addLast(t); } this.roboot = roboot; this.lock = lock; this.member = member; } @Override public void start() { final int average = param.size() / maxThread; final int surplus = param.size() % maxThread; startTime = System.currentTimeMillis(); for(int i = 1; i <= maxThread; i++) { final int co = i; Thread t = new Thread(new Runnable() { @Override public void run() { List<T> p = new ArrayList<>(); try { for(int j = ((co - 1) * average); j < co * average; j ++) { p.add(getList()); } if(co == maxThread) { for(int j = 0; j < surplus; j++) { try { T item = getList(); if(null != item) { p.add(item); } }catch (NoSuchElementException e) { e.fillInStackTrace(); } } } roboot.excute(p, member); }finally { synchronized (lock) { thisThread --; if(thisThread == 0) { over = true; } } } } }); t.start(); thisThread ++; } } @Override public void stop() { //不实现 } @Override public int getTaskCount() { return param.size(); } @Override public int getThreadCount() { // TODO Auto-generated method stub return thisThread; } @Override public long doWait() { while(!over) { try { Thread.sleep(10); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } continue; } if(startTime == 0) { throw new NotStartedException(); } return System.currentTimeMillis() - startTime; } private T getList() { synchronized (lock) { return param.removeFirst(); } } } ``` 发起者 发起者是一个工厂类,他的作用就是通过创建管理者的角色让它去执行你交给他的任务。其实很简单,不多说了,上代码: ``` package com.cnpany.common.util.thread.factory; import java.util.List; import java.util.concurrent.locks.ReentrantLock; import com.cnpany.common.util.thread.ThreadRobot; import com.cnpany.common.util.thread.ThreadTask; import com.cnpany.common.util.thread.impl.BatchThreadTask; /** * 线程任务工厂 * @author 郭胜凯 * @time 2017年10月28日 上午11:27:25 * @email guoshengkai@shoujinwang.com */ public class ThreadTaskFactory { /** * 创建批量任务线程池 * @param roboot * 工作机器人 * @param maxThread * 线程数 * @param quqe * 消费队列 * @param param * 机器人用到的参数 * @return */ public static<T, K> ThreadTask<T> createBatch(ThreadRobot<T, K> roboot, int maxThread, List<T> quqe, K param) { return new BatchThreadTask<T, K>(roboot, maxThread, quqe, new ReentrantLock(), param); } } OK!到此为止,这个线程池就算是完工了,具体怎么去用呢,我们只需要通过ThreadTaskFactory来创建这么一个批处理任务,也就是传机器人(工作者)需要做的实现逻辑就好,这里我是直接这么做的: /** * 创建满标合同任务,4分钟秒扫描一次 */ @Scheduled(cron = "0 0/4 * * * ? ") public void newFullScaleContract() { if(!fullIng) { fullIng = true; //扫描未生成满标合同的标的 List<String> projectNos = commonSignService.listFullScaleProjectNo(); logger.info("-----------[满标合同] - 得到未生成满标合同的标的"); //满标合同批量任务线程池 ThreadTask<String> batchWork = ThreadTaskFactory.createBatch(new ThreadRobot<String, CommonSignService>() { @Override public void excute(List<String> queue, CommonSignService service) { for (String projectNo : queue) { service.getContractByProjectNo(projectNo); } } }, 10, projectNos, commonSignService); batchWork.start(); logger.info("-----------[满标合同] - 开始批量处理满标合同"); batchWork.doWait(); logger.info("-----------[满标合同] - 满标合同处理完成"); fullIng = false; } } ``` 可以看到,我这里把需要处理的记录ID查出来以后,直接把这些ID以及处理这些ID需要用到的CommonSignService丢到Factory,并且把机器人的实现代码穿进去就得到ThreadTask这个对象了,这个对象就是我们需要的批处理线程池。我直接调用start()方法就开始处理的,这个时候,整个线程的执行是异步的,不影响主体流程向下执行。这个时候我再调用doWait方法,则可以等待整个批处理任务全部完成后再向下执行,这个方法是有阻塞效果的,同时这个方法执行完毕后,会返回long类型的毫秒值,这个值就是整个任务执行的时间(非阻塞时间)。 总结 到此为止,整个的线程机制算是完成了,以后类似的需求我只需要通过ThreadTaskFactory来创建一个线程池,仅需几行代码就可以实现整个大业务的批量处理工作了。 #### 参与贡献 1. Fork 本仓库 2. 新建 Feat_xxx 分支 3. 提交代码 4. 新建 Pull Request #### 码云特技 1. 使用 Readme\_XXX.md 来支持不同的语言,例如 Readme\_en.md, Readme\_zh.md 2. 码云官方博客 [blog.gitee.com](https://blog.gitee.com) 3. 你可以 [https://gitee.com/explore](https://gitee.com/explore) 这个地址来了解码云上的优秀开源项目 4. [GVP](https://gitee.com/gvp) 全称是码云最有价值开源项目,是码云综合评定出的优秀开源项目 5. 码云官方提供的使用手册 [https://gitee.com/help](https://gitee.com/help) 6. 码云封面人物是一档用来展示码云会员风采的栏目 [https://gitee.com/gitee-stars/](https://gitee.com/gitee-stars/)
举报
举报成功
我们将于2个工作日内通过站内信反馈结果给你!
请认真填写举报原因,尽可能描述详细。
请选择举报类型
取消
发送
误判申诉

此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。

如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。

取消
提交

简介

batchThreadTask一个适合绝大多数场景下的批量任务线程池 在工作中的一个场景中,需要紧急处理五千多份合同,合同处理的过程有些复杂,这里说的复杂不是代码复杂,是中间需要经过多个服务渠道,包括对外对接的一些业务,所以这五千来分如果同步处理的话,估计要跑上估计至少也得半天的时间了,而后期确定了还会面临同样的问题(坑爹的代码与凌乱的架构问题),因此写了一个处理合同的线程池,越写兴趣越浓,最后写出来以后发现这个鸟玩意儿实在是太通用了,几乎可以用在所有场景下的批量任务。 这个线程池可以说是为批量任务量身定做的一套方案,并且几乎可以实现任何场景下的批量任务
暂无标签
README
未知许可证
查看未知开源许可协议
取消

发行版

暂无发行版

贡献者

全部

近期动态

不能加载更多了
编辑仓库简介
简介内容
主页
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Java
1
https://gitee.com/158cosmos/batchThreadTask.git
git@gitee.com:158cosmos/batchThreadTask.git
158cosmos
batchThreadTask
batchThreadTask
阿啄debugIT
点此查找更多帮助

搜索帮助

评论
仓库举报
回到顶部
登录提示
该操作需登录 Gitee 帐号,请先登录后再操作。
立即登录
没有帐号,去注册

AltStyle によって変換されたページ (->オリジナル) /