diff --git "a/Disruptor/Disruptor345円271円277円346円222円255円346円250円241円345円274円217円344円270円216円346円211円247円350円241円214円351円241円272円345円272円217円351円223円276円346円272円220円347円240円201円345円210円206円346円236円220円.md" "b/Disruptor/Disruptor345円271円277円346円222円255円346円250円241円345円274円217円344円270円216円346円211円247円350円241円214円351円241円272円345円272円217円351円223円276円346円272円220円347円240円201円345円210円206円346円236円220円.md" new file mode 100644 index 0000000..25d4ed3 --- /dev/null +++ "b/Disruptor/Disruptor345円271円277円346円222円255円346円250円241円345円274円217円344円270円216円346円211円247円350円241円214円351円241円272円345円272円217円351円223円276円346円272円220円347円240円201円345円210円206円346円236円220円.md" @@ -0,0 +1,702 @@ +> 【源码笔记】专注于Java后端系列框架源码分析,Github地址:https://github.com/yuanmabiji/Java-SourceCode-Blogs + +# 1 前言 + +本篇文章开始`Disruptor`的源码分析,理解起来相对比较困难,特别是`Disruptor`的`sequenceBarrier`的理解,`sequenceBarrier`包括生产者与消费者之间的`gatingSequence`以及消费者与消费者之间的`dependentSequence`。此外,`Disruptor`源码中的`sequence`变量也比较多,需要捋清楚各种`sequence`的含义。最后,建议小伙伴们动手调试理解,效果会更好。 + +# 2 Disruptor六边形DEMO + +分析源码前,先来看看`Disruptor`六边形执行器链的`DEMO`。 + +```java +public class LongEventMain +{ + private static final int BUFFER_SIZE = 1024; + public static void main(String[] args) throws Exception + { + // 1,构建disruptor + final Disruptor disruptor = new Disruptor( + new LongEventFactory(), + BUFFER_SIZE, + Executors.newFixedThreadPool(5), // 【注意点】线程池需要保证足够的线程:有多少个消费者就要有多少个线程,否则有些消费者将不会执行,生产者可能也会一直阻塞下去 + ProducerType.SINGLE, + new YieldingWaitStrategy() + ); + + EventHandler eventHandler1 = new LongEventHandler1(); + EventHandler eventHandler2 = new LongEventHandler2(); + EventHandler eventHandler3 = new LongEventHandler3(); + EventHandler eventHandler4 = new LongEventHandler4(); + EventHandler eventHandler5 = new LongEventHandler5(); + + // 方式1 构建串行执行顺序: + /*disruptor + .handleEventsWith(eventHandler1) + .handleEventsWith(eventHandler2) + .handleEventsWith(eventHandler3) + .handleEventsWith(eventHandler4) + .handleEventsWith(eventHandler5);*/ + + // 方式2 构建并行执行顺序 + /*disruptor + .handleEventsWith(eventHandler1, eventHandler2, eventHandler3, eventHandler4, eventHandler5);*/ + + // 方式3 构建菱形执行顺序 + /*disruptor.handleEventsWith(eventHandler1, eventHandler2) + .handleEventsWith(eventHandler3);*/ + + // 2,构建eventHandler执行链 + // 方式4 构建六边形执行顺序 + disruptor.handleEventsWith(eventHandler1, eventHandler3); + disruptor.after(eventHandler1).handleEventsWith(eventHandler2); + disruptor.after(eventHandler3).handleEventsWith(eventHandler4); + disruptor.after(eventHandler2, eventHandler4).handleEventsWith(eventHandler5); + + // 3, 启动disruptor即启动线程池线程执行BatchEventProcessor任务 + disruptor.start(); + + // 4,生产者往ringBuffer生产数据并唤醒所有的消费者消费数据 + RingBuffer ringBuffer = disruptor.getRingBuffer(); + ByteBuffer bb = ByteBuffer.allocate(8); + bb.putLong(0, 666); + ringBuffer.publishEvent(new LongEventTranslatorOneArg(), bb); + } + + static class LongEventTranslatorOneArg implements EventTranslatorOneArg { + @Override + public void translateTo(LongEvent event, long sequence, ByteBuffer buffer) { + event.set(buffer.getLong(0)); + } + } + + static class LongEvent + { + private long value; + + public void set(long value) + { + this.value = value; + } + + public long get() { + return this.value; + } + } + + static class LongEventFactory implements EventFactory + { + @Override + public LongEvent newInstance() + { + return new LongEvent(); + } + } + + static class LongEventHandler1 implements EventHandler + { + @Override + public void onEvent(LongEvent event, long sequence, boolean endOfBatch) + { + System.out.println("LongEventHandler1-" + event.get() + " executed by " + Thread.currentThread().getName()); + } + } + + static class LongEventHandler2 implements EventHandler + { + @Override + public void onEvent(LongEvent event, long sequence, boolean endOfBatch) + { + System.out.println("LongEventHandler2-" + event.get() + " executed by " + Thread.currentThread().getName()); + } + } + + static class LongEventHandler3 implements EventHandler + { + @Override + public void onEvent(LongEvent event, long sequence, boolean endOfBatch) + { + System.out.println("LongEventHandler3-" + event.get() + " executed by " + Thread.currentThread().getName()); + } + } + + static class LongEventHandler4 implements EventHandler + { + @Override + public void onEvent(LongEvent event, long sequence, boolean endOfBatch) + { + System.out.println("LongEventHandler4-" + event.get() + " executed by " + Thread.currentThread().getName()); + } + } + + static class LongEventHandler5 implements EventHandler + { + @Override + public void onEvent(LongEvent event, long sequence, boolean endOfBatch) + { + System.out.println("LongEventHandler5-" + event.get() + " executed by " + Thread.currentThread().getName()); + } + } +} +``` +这个`Demo`也是`Disruptor`广播模式与执行顺序链构建的`Demo`,有以下值得注意的点: +1. 生产者总是要把`RingBuffer`填充完一圈后才会考虑追赶消费者进度的问题; +2. 线程池需要保证足够的线程:有多少个消费者就要有多少个线程,否则有些消费者将不会执行(消费者线程起不来),生产者生产完一圈`RingBuffer`后即使有新的数据生产者也会一直阻塞下去; +3. 消费者执行链中,每个消费者都是独立的消费线程,决定当前消费者消不消费的只有其依赖的消费者有无消费完,消费者进行消费第二个数据时无须等整个执行链执行完才能消费。比如有执行链:A->B-C,生产者在`Ringbuffer`中生产了2个数据,那么消费顺序可能为A->B->C->A->B-C,也可能为A->B-A->B->C->C,也可能为A->A->B->B->C->C等。 +4. 生产者填充完第一圈`Ringbuffer`后,当要追赶消费者消费速度时,此时生产者能否继续生产取决于执行链最后一个消费者的消费速度。比如有执行链:A->B-C,生产者的生产速度取决于消费者C的消费速度。 + +# 3 初始化Disruptor实例 + +先来看下前面DEMO中的初始化`Disruptor`实例代码: + +```java +// 1,构建disruptor +final Disruptor disruptor = new Disruptor( + new LongEventFactory(), + BUFFER_SIZE, + Executors.newFixedThreadPool(5), // 线程池需要保证足够的线程 + ProducerType.SINGLE, + new YieldingWaitStrategy() +); +``` + +这句代码最终是给`Disruptor`的`ringBuffer`和`executor`属性赋值: + +```java +// Disruptor.java +public Disruptor( + final EventFactory eventFactory, + final int ringBufferSize, + final Executor executor, + final ProducerType producerType, + final WaitStrategy waitStrategy) +{ + this( + // 创建RingBuffer实例 + RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy), + executor); +} + +private Disruptor(final RingBuffer ringBuffer, final Executor executor) +{ + this.ringBuffer = ringBuffer; + this.executor = executor; +} +``` + +那么`RingBuffer`实例又是如何创建的呢?我们来看下`RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy)`这句源码: + +```java +// RingBuffer.java +public static RingBuffer create( + final ProducerType producerType, + final EventFactory factory, + final int bufferSize, + final WaitStrategy waitStrategy) +{ + switch (producerType) + { + case SINGLE: + return createSingleProducer(factory, bufferSize, waitStrategy); + case MULTI: + return createMultiProducer(factory, bufferSize, waitStrategy); + default: + throw new IllegalStateException(producerType.toString()); + } +} +``` + +首先会根据`producerType`来创建不同的`Producer`,以创建`SingleProducerSequencer`实例为例进去源码看下: + +```java +// RingBuffer.java +public static RingBuffer createSingleProducer( + final EventFactory factory, + final int bufferSize, + final WaitStrategy waitStrategy) + { + // 1,创建SingleProducerSequencer实例 + SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy); + // 2,创建RingBuffer实例 + return new RingBuffer(factory, sequencer); + } +``` + +## 3.1 创建SingleProducerSequencer实例 + +首先创建了`SingleProducerSequencer`实例,给`SingleProducerSequencer`实例的`bufferSize`和`waitStrategy`赋初值; + +```java +// AbstractSequencer.java +// SingleProducerSequencer父类 +public AbstractSequencer(int bufferSize, WaitStrategy waitStrategy) +{ + this.bufferSize = bufferSize; + this.waitStrategy = waitStrategy; +} +``` + +此外,创建`SingleProducerSequencer`实例时还初始化了一个成员变量`cursor`: + +```java +protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); +``` + +即给`cursor`赋值了一个`Sequence`实例对象,`Sequence`是标识`RingBuffer`环形数组的下标,同时生产者和消费者也会同时维护各自的`Sequence`。最重要的是,**`Sequence`通过填充CPU缓存行避免了伪共享带来的性能损耗**,来看下其填充缓存行源码: + +```java +// Sequence.java +class LhsPadding +{ + // 左填充 + protected long p1, p2, p3, p4, p5, p6, p7; +} + +class Value extends LhsPadding +{ + // Sequence值 + protected volatile long value; +} + +class RhsPadding extends Value +{ + // 右填充 + protected long p9, p10, p11, p12, p13, p14, p15; +} + +public class Sequence extends RhsPadding +{ + // ... +} +``` + + + +## 3.2 创建RingBuffer实例 + +然后核心是创建`RingBuffer`实例,看看最终创建`RingBuffer`实例源码: + +```java +// RingBuffer.java +RingBufferFields( // RingBufferFields为RingBuffer父类 + final EventFactory eventFactory, + final Sequencer sequencer) +{ + this.sequencer = sequencer; + this.bufferSize = sequencer.getBufferSize(); + + if (bufferSize < 1) + { + throw new IllegalArgumentException("bufferSize must not be less than 1"); + } + if (Integer.bitCount(bufferSize) != 1) + { + throw new IllegalArgumentException("bufferSize must be a power of 2"); + } + + this.indexMask = bufferSize - 1; + // 【重要特性】内存预加载,内存池机制 + this.entries = (E[]) new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD]; + fill(eventFactory); +} +``` + +可以看到先前创建的`SingleProducerSequencer`实例作为构造参数传入给了`RingBuffer`实例的`sequencer`属性赋初值,然后最重要的是在创建`RingBuffer`实例时,会为`RingBuffer`的环形数组提前填充`Event`对象,即**内存池机制**: + +```java +// RingBuffer.java +private void fill(final EventFactory eventFactory) +{ + for (int i = 0; i < bufferSize; i++) + { + entries[BUFFER_PAD + i] = eventFactory.newInstance(); + } +} +``` + +内存池机制好处: + +1. 提前创建好复用的对象,减少程序运行时因为创建对象而浪费性能,其实也是一种空间换时间的思想; +2. 因为环形数组对象可复用,从而避免GC来提高性能。 + +# 4 构建执行顺序链 + +```java +// 2,构建eventHandler执行链:构建六边形执行顺序 +disruptor.handleEventsWith(eventHandler1, eventHandler3); +disruptor.after(eventHandler1).handleEventsWith(eventHandler2); +disruptor.after(eventHandler3).handleEventsWith(eventHandler4); +disruptor.after(eventHandler2, eventHandler4).handleEventsWith(eventHandler5); +``` + +![](https://common-ymbj.oss-cn-beijing.aliyuncs.com/Disruptor/2/1.png) + +再来看看`Disruptor`构建执行顺序链相关源码: + +先来看看`disruptor.handleEventsWith(eventHandler1, eventHandler3);`源码: + +```java +// Disruptor.java +public final EventHandlerGroup handleEventsWith(final EventHandler... handlers) +{ + return createEventProcessors(new Sequence[0], handlers); +} + +EventHandlerGroup createEventProcessors( + final Sequence[] barrierSequences, + final EventHandler[] eventHandlers) +{ + checkNotStarted(); + // 根据eventHandlers长度来创建多少个消费者Sequence实例,注意这个processorSequences是传递到EventHandlerGroup用于构建执行顺序链用的, + // 比如有执行顺序链:A->B,那么A的sequenct即processorSequences会作为B节点的barrierSequences即dependencySequence + final Sequence[] processorSequences = new Sequence[eventHandlers.length]; + // 新建了一个ProcessingSequenceBarrier实例返回 + // ProcessingSequenceBarrier实例作用:序号屏障,通过追踪生产者的cursorSequence和每个消费者( EventProcessor) + // 的sequence的方式来协调生产者和消费者之间的数据交换进度 + final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);// 如果构建执行顺序链比如A->B,那么barrierSequences是A消费者的sequence;如果是A,C->B,那么barrierSequences是A和C消费者的sequence + + for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) + { + final EventHandler eventHandler = eventHandlers[i]; + // 有多少个eventHandlers就创建多少个BatchEventProcessor实例(消费者), + // 但需要注意的是同一批次的每个BatchEventProcessor实例共用同一个SequenceBarrier实例 + final BatchEventProcessor batchEventProcessor = + new BatchEventProcessor(ringBuffer, barrier, eventHandler); + + if (exceptionHandler != null) + { + batchEventProcessor.setExceptionHandler(exceptionHandler); + } + // 将batchEventProcessor, eventHandler, barrier封装成EventProcessorInfo实例并加入到ConsumerRepository相关集合 + // ConsumerRepository作用:提供存储机制关联EventHandlers和EventProcessors + consumerRepository.add(batchEventProcessor, eventHandler, barrier); // // 如果构建执行顺序链比如A->B,那么B消费者也一样会加入consumerRepository的相关集合 + // 获取到每个消费的消费sequece并赋值给processorSequences数组 + // 即processorSequences[i]引用了BatchEventProcessor的sequence实例, + // 但processorSequences[i]又是构建生产者gatingSequence和消费者执行器链dependentSequence的来源 + processorSequences[i] = batchEventProcessor.getSequence(); + } + // 总是拿执行器链最后一个消费者的sequence作为生产者的gateingSequence + updateGatingSequencesForNextInChain(barrierSequences, processorSequences); + // 最终返回封装了Disruptor、ConsumerRepository和消费者sequence数组processorSequences的EventHandlerGroup对象实例返回 + return new EventHandlerGroup(this, consumerRepository, processorSequences); +} +``` + +构建`Disruptor`执行顺序链的核心逻辑就在这段源码中,我们缕一缕核心逻辑: + +1. 有多少个`eventHandlers`就创建多少个`BatchEventProcessor`实例(消费者),`BatchEventProcessor`消费者其实就是一个实现`Runnable`接口的线程实例; +2. 每个`BatchEventProcessor`实例(消费者)拥有前一个消费者的`sequence`作为其`sequenceBarrier`即`dependentSequence`; +3. 当前消费者的`sequence`通过`EventHandlerGroup`这个载体来传递给下一个消费者作为其`sequenceBarrier`即`dependentSequence`。 + +再来看看`diruptor.after(eventHandler1)`源码: + +```java +// Disruptor.java +public final EventHandlerGroup after(final EventHandler... handlers) +{ + // 获取指定的EventHandler的消费者sequence并赋值给sequences数组, + // 然后重新新建一个EventHandlerGroup实例返回(封装了前面的指定的消费者sequence被赋值 + // 给了EventHandlerGroup的成员变量数组sequences,用于后面指定执行顺序用) + final Sequence[] sequences = new Sequence[handlers.length]; + for (int i = 0, handlersLength = handlers.length; i < handlersLength; i++) + { + sequences[i] = consumerRepository.getSequenceFor(handlers[i]); + } + + return new EventHandlerGroup(this, consumerRepository, sequences); +} +``` + +这段源码做的事情也是将当前消费者`sequence`封装进`EventHandlerGroup`,从而可以通过这个载体来传递给下一个消费者作为其`sequenceBarrier`即`dependentSequence`。 + +最终构建的最终`sequence`依赖关系如下图,看到这个图不禁让我想起`AQS`的线程等待链即CLH锁的变相实现,附上文章链接,有兴趣的读者可以比对理解。[AQS基础——多图详解CLH锁的原理与实现](https://mp.weixin.qq.com/s/xBw7koGuZtqU8imZ9_JzDA) + +![](https://common-ymbj.oss-cn-beijing.aliyuncs.com/Disruptor/2/20220404235504.png) + +# 5 启动Disruptor实例 + +```java +// 3, 启动disruptor即启动线程池线程执行BatchEventProcessor任务 +disruptor.start(); +``` + +我们再来看看` disruptor.start()`这句源码: + +```java +// Disruptor.java +public RingBuffer start() +{ + checkOnlyStartedOnce(); + // 遍历每一个BatchEventProcessor消费者(线程)实例,并把该消费者线程实例跑起来 + for (final ConsumerInfo consumerInfo : consumerRepository) + { + consumerInfo.start(executor); + } + + return ringBuffer; +} +``` + +其实这里做的事情无非就是遍历每个消费者线程实例,然后启动每个消费者线程实例`BatchEventProcessor`,其中`BatchEventProcessor`被封装进`ConsumerInfo`实例。还没生产数据就启动消费线程的话,此时消费者会根据阻塞策略`WaitStrategy`进行阻塞。 + +# 6 生产消费数据 + +## 6.1 生产者生产数据 + +```java +// 4,生产者往ringBuffer生产数据并唤醒所有的消费者消费数据 +RingBuffer ringBuffer = disruptor.getRingBuffer(); +ByteBuffer bb = ByteBuffer.allocate(8); +bb.putLong(0, 666); +ringBuffer.publishEvent(new LongEventTranslatorOneArg(), bb); +``` + +生产者生产数据的源码在`ringBuffer.publishEvent(new LongEventTranslatorOneArg(), bb);`中。 + +```java +// RingBuffer.java +public void publishEvent(final EventTranslatorOneArg translator, final A arg0) +{ + // 【1】获取下一个RingBuffer中需填充数据的event对象的序号,对应生产者 + final long sequence = sequencer.next(); + // 【2】转换数据格式并生产数据并唤醒消费者 + translateAndPublish(translator, sequence, arg0); +} +``` + +### 6.1.1 生产者获取RingBuffer的sequence + +先来看下单生产者获取`sequence`的源码: + +```java +// SingleProducerSequencer.java +public long next(final int n) +{ + if (n < 1 || n> bufferSize) + { + throw new IllegalArgumentException("n must be> 0 and < bufferSize"); + } + // 总是拿到生产者已生产的当前序号 + long nextValue = this.nextValue; + // 获取要生产的下n个序号 + long nextSequence = nextValue + n; + // 生产者总是先有bufferSize个坑可以填,所以nextSequence - bufferSize + long wrapPoint = nextSequence - bufferSize; + // 拿到上一次的GatingSequence,因为是缓存,这里不是最新的 + long cachedGatingSequence = this.cachedValue; + // 如果生产者生产超过了消费者消费速度,那么这里自旋等待,这里的生产者生产的下标wrapPoint是已经绕了RingBuffer一圈的了哈 + if (wrapPoint> cachedGatingSequence || cachedGatingSequence> nextValue) + { + cursor.setVolatile(nextValue); // StoreLoad fence + + long minSequence; + // 自旋等待,其中gatingSequences是前面构建执行顺序链时的最后一个消费者的sequence + while (wrapPoint> (minSequence = Util.getMinimumSequence(gatingSequences, nextValue))) + { + LockSupport.parkNanos(1L); // TODO: Use waitStrategy to spin? + } + + this.cachedValue = minSequence; + } + // 将获取的nextSequence赋值给生产者当前值nextValue + this.nextValue = nextSequence; + + return nextSequence; +} +``` + +这段源码相对较难,我们缕一缕: + +1. 生产者把第一圈`RingBuffer`的坑填完后,此时生产者进入`RingBuffer`第2圈,如果消费者消费速度过慢,此时生产者很可能会追上消费者,如果追上消费者那么就让生产者自旋等待; + +2. 第1点的**如果消费者消费速度过慢**,对于构建了一个过滤器链的消费者中,那么指的是哪个消费者呢?指的就是执行器链最后执行的那个消费者,`gatingSequences`就是执行器链最后执行的那个消费者的`sequence`;**这个`gatingSequences`其实就是防止生产者追赶消费者的`sequenceBarrier`**; + + ![](https://common-ymbj.oss-cn-beijing.aliyuncs.com/Disruptor/2/20220404235652.png) + +3. 生产者总是先把第一圈`RingBuffer`填满后,才会考虑追赶消费者的问题,因此才有`wrapPoint> cachedGatingSequence`的评判条件。 + +前面是单生产者获取`sequence`的源码,对于多生产者`MultiProducerSequencer`的源码逻辑也是类似,只不过将生产者当前值`cursor`和`cachedGatingSequence`用了CAS操作而已,防止多线程问题。 + + + +### 6.1.2 生产者生产数据并唤醒消费者 + +再来看看` translateAndPublish(translator, sequence, arg0)`源码: + +```java +// RingBuffer.java +private void translateAndPublish(final EventTranslatorOneArg translator, final long sequence, final A arg0) +{ + try + { + // 【1】将相应数据arg0转换为相应的Eevent数据,其中get(sequence)会从RingBuffer数组对象池中取出一个对象,而非新建 + translator.translateTo(get(sequence), sequence, arg0); + } + finally + { + // 【2】发布该序号说明已经生产完毕供消费者使用 + sequencer.publish(sequence); + } +} + + + +// SingleProducerSequencer.java +public void publish(final long sequence) +{ + // 【1】给生产者cursor游标赋值新的sequence,说明该sequenc对应的对象数据已经填充(生产)完毕 + cursor.set(sequence);// 这个cursor即生产者生产时移动的游标,是AbstractSequencer的成员变量 + // 【2】根据阻塞策略将所有消费者唤醒 + // 注意:这个waitStrategy实例是所有消费者和生产者共同引用的 + waitStrategy.signalAllWhenBlocking(); +} + +``` + +生产者生产数据并唤醒消费者的注释已经写得很清楚了,这里需要注意的点: + +1. `cursor`才是生产者生产数据的当前下标,消费者消费速度有无追赶上生产者就是拿消费者的消费`sequence`跟生产者的`cursor`比较的,因此生产者生产数据完成后需要给`cursor`赋值; +2. `waitStrategy`策略对象时跟消费者共用的,这样才能线程间实现阻塞唤醒逻辑。 + +## 6.2 消费者消费数据 + +前面第4节启动`Disruptor`实例中讲到,其实就是开启各个消费者实例`BatchEventProcessor`线程,我们看看其`run`方法中的核心逻辑即`processEvents`源码: + +```java +// BatchEventProcessor.java +private void processEvents() +{ + T event = null; + // nextSequence:消费者要消费的下一个序号 + long nextSequence = sequence.get() + 1L; // 【重要】每一个消费者都是从0开始消费,各个消费者维护各自的sequence + // 消费者线程一直在while循环中不断获取生产者数据 + while (true) + { + try + { + // 拿到当前生产者的生产序号 + final long availableSequence = sequenceBarrier.waitFor(nextSequence); + if (batchStartAware != null) + { + batchStartAware.onBatchStart(availableSequence - nextSequence + 1); + } + // 如果消费者要消费的下一个序号小于生产者的当前生产序号,那么消费者则进行消费 + // 这里有一个亮点:就是消费者会一直循环消费直至到达当前生产者生产的序号 + while (nextSequence <= availableSequence) + { + event = dataProvider.get(nextSequence); + eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence); + nextSequence++; + } + // 消费完后设置当前消费者的消费进度,这点很重要 + // 【1】如果当前消费者是执行链的最后一个消费者,那么其sequence则是生产者的gatingSequence,因为生产者就是拿要生产的下一个sequence跟gatingSequence做比较的哈 + // 【2】如果当前消费者不是执行器链的最后一个消费者,那么其sequence作为后面消费者的dependentSequence + sequence.set(availableSequence); + } + catch (final TimeoutException e) + { + notifyTimeout(sequence.get()); + } + catch (final AlertException ex) + { + if (running.get() != RUNNING) + { + break; + } + } + catch (final Throwable ex) + { + handleEventException(ex, nextSequence, event); + sequence.set(nextSequence); + nextSequence++; + } + } +} +``` + + + +消费者线程起来后,然后进入死循环,持续不断从生产者处**批量**获取可用的序号,如果获取到可用序号后,那么遍历所有可用序号,然后调用`eventHandler`的`onEvent`方法消费数据,`onEvent`方法写的是消费者的业务逻辑。消费完后再设置当前消费者的消费进度,这点很重要,用于构建`sequenceBarrier`包括`gatingSequence`和`dependentSequence`。 + + + +下面再来看看消费者是怎么获取可用的序号的,继续看`sequenceBarrier.waitFor(nextSequence)`源码: + +```java +// ProcessingSequenceBarrier.java + +public long waitFor(final long sequence) + throws AlertException, InterruptedException, TimeoutException +{ + checkAlert(); + // availableSequence:获取生产者生产后可用的序号 + // sequence:消费者要消费的下一个序号 + // cursorSequence:生产者生产数据时的当前序号 + // dependentSequence:第一个消费者即前面不依赖任何消费者的消费者,dependentSequence就是生产者游标; + // 有依赖其他消费者的消费者,dependentSequence就是依赖的消费者的sequence + long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); + + if (availableSequence < sequence) + { + return availableSequence; + } + // 这个主要是针对多生产者的情形 + return sequencer.getHighestPublishedSequence(sequence, availableSequence); +} +``` + +可以看到`ProcessingSequenceBarrier`封装了`WaitStrategy`等待策略实例,此时消费者获取下一批可用序号的逻辑又封装在了`WaitStrategy`的`waitFor`方法中,以`BlockingWaitStrategy`为例来其实现逻辑: + +```java +// BlockingWaitStrategy.java + +public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence, SequenceBarrier barrier) + throws AlertException, InterruptedException +{ + long availableSequence; + // cursorSequence:生产者的序号 + // 第一重条件判断:如果消费者消费速度大于生产者生产速度(即消费者要消费的下一个数据已经大于生产者生产的数据时),那么消费者等待一下 + if (cursorSequence.get() < sequence) + { + lock.lock(); + try + { + while (cursorSequence.get() < sequence) + { + barrier.checkAlert(); + processorNotifyCondition.await(); + } + } + finally + { + lock.unlock(); + } + } + // 第一重条件判断:自旋等待 + // 即当前消费者线程要消费的下一个sequence大于其前面执行链路(若有依赖关系)的任何一个消费者最小sequence(dependentSequence.get()),那么这个消费者要自旋等待, + // 直到前面执行链路(若有依赖关系)的任何一个消费者最小sequence(dependentSequence.get())已经大于等于当前消费者的sequence时,说明前面执行链路的消费者已经消费完了 + while ((availableSequence = dependentSequence.get()) < sequence) + { + barrier.checkAlert(); + ThreadHints.onSpinWait(); + } + + return availableSequence; +} +``` + +可以看到,消费者获取下一批可用消费序号时,此时要经过两重判断: + +1. 第一重判断:**消费者消费的序号不能超过当前生产者消费当前生产的序号**,否则消费者就阻塞等待;当然,这里因为是`BlockingWaitStrategy`等待策略的实现,如果是其他策略,比如`BusySpinWaitStrategy`和`YieldingWaitStrategy`的话,这里消费者是不会阻塞等待的,而是自旋,因此这也是其无锁化的实现了,但就是很耗CPU而已; +2. 第二重判断:**消费者消费的序号不能超过其前面依赖的消费消费的序号**,否则其自旋等待。因为这里是消费者等消费者,按理说前面消费者应该会很快处理完,所以不用阻塞等待;但是消费者等待生产者的话,如果生产者没生产数据的话,消费者还是自旋等待的话会比较浪费CPU,所以对于`BlockingWaitStrategy`策略,是阻塞等待了。 + +# 7 WaitStrategy等待策略 + +最后,再来看下`WaitStrategy`有哪些实现类: + +![](https://common-ymbj.oss-cn-beijing.aliyuncs.com/Disruptor/2/20220404185331.png) + +可以看到消费者的`WaitStrategy`等待策略有8种实现类,可以分为有锁和无锁两大类,然后每一种都有其适用的场合,没有最好的`WaitStrategy`等待策略,只有适合自己应用场景的等待策略。因为其源码不是很难,这里不再逐一分析。 + + + +> `disruptor`中文源码注释地址:https://github.com/yuanmabiji/disruptor diff --git a/Disruptor/README.md b/Disruptor/README.md new file mode 100644 index 0000000..b276848 --- /dev/null +++ b/Disruptor/README.md @@ -0,0 +1,18 @@ + +【**源码笔记**】专注于Java后端系列框架的源码分析。若觉得源码分析文章不错,欢迎Star哦。 + + +================**Disruptor源码专题持续更新中...**==================== + +#### 目录 + +1. [初识Disruptor框架!](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/Disruptor/初识Disruptor框架.md) +2. 持续更新中... + +* Disruptor源码解析项目(带中文注释):https://github.com/yuanmabiji/disruptor +* 更多源码分析文章请跳转至:https://github.com/yuanmabiji/Java-SourceCode-Blogs + + + + + diff --git a/Disruptor/images/img.png b/Disruptor/images/img.png new file mode 100644 index 0000000..3f07d95 Binary files /dev/null and b/Disruptor/images/img.png differ diff --git a/Disruptor/images/img_1.png b/Disruptor/images/img_1.png new file mode 100644 index 0000000..8091ab2 Binary files /dev/null and b/Disruptor/images/img_1.png differ diff --git "a/Disruptor/345円210円235円350円257円206円Disruptor346円241円206円346円236円266円.md" "b/Disruptor/345円210円235円350円257円206円Disruptor346円241円206円346円236円266円.md" new file mode 100644 index 0000000..b06975a --- /dev/null +++ "b/Disruptor/345円210円235円350円257円206円Disruptor346円241円206円346円236円266円.md" @@ -0,0 +1,148 @@ +>最近工作中参与了一个随机数分发平台的设计,考虑如何才能实现该平台的高并发性能,在技术实现选型中首先参考了百度的[uid-generator](https://github.com/baidu/uid-generator),其采用了双`RingBuffer`的实现形式,估计uid-generator的双`RingBuffer`也是借鉴了Disruptor的实现思想吧。因此,本系列文章我们一起来探究学习下2011年获得了Duke’s 程序框架创新奖的`Disruptor`框架。 + + +# 1 前言 +Martin Fowler在自己网站上写了一篇LMAX架构的文章,LMAX是一种运行在JVM平台上的新型零售金融交易平台,该平台能够以很低的延迟产生大量交易,大量交易是多少呢?单个线程达到了每秒处理6百万订单的TPS,虽然业务逻辑是纯内存操作,但每秒处理6百万订单的TPS已经高的惊人了。那么,是什么支撑了LMAX单个线程能达到每秒处理6百万订单呢?答案就是`Disruptor`。 + +`Disruptor`是一个开源的并发框架,其于2011年获得了Duke’s 程序框架创新奖,采用事件源驱动方式,能够在无锁的情况下实现网络的Queue并发操作。 + +# 2 Disruptor框架简介 + +`Disruptor`框架内部核心的数据结构是`Ring Buffer`,`Ring Buffer`是一个环形的数组,`Disruptor`框架以`Ring Buffer`为核心实现了异步事件处理的高性能架构;JDK的`BlockingQueue`相信大家都用过,其是一个阻塞队列,内部通过锁机制实现生产者和消费者之间线程的同步。跟`BlockingQueue`一样,`Disruptor`框架也是围绕`Ring Buffer`实现生产者和消费者之间数据的交换,只不过`Disruptor`框架性能更高,笔者曾经在同样的环境下拿`Disruptor`框架跟`ArrayBlockingQueue`做过性能测试,`Disruptor`框架处理数据的性能比`ArrayBlockingQueue`的快几倍。 + +`Disruptor`框架性能为什么会更好呢?其有以下特点: + +1. 预加载内存可以理解为使用了内存池; +2. 无锁化 +3. 单线程写 +4. 消除伪共享 +5. 使用内存屏障 +6. 序号栅栏机制 + + + +# 3 相关概念 + +![img.png](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/Disruptor/images/img.png?raw=true) + + +**Disruptor**:是使用`Disruptor`框架的核心类,持有`RingBuffer`、消费者线程池、消费者集合`ConsumerRepository`和消费者异常处理器`ExceptionHandler`等引用; + +**Ring Buffer**: `RingBuffer`处于`Disruptor`框架的中心位置,其是一个环形数组,环形数组的对象采用预加载机制创建且能重用,是生产者和消费者之间交换数据的桥梁,其持有`Sequencer`的引用; + + +**Sequencer**: `Sequencer`是`Disruptor`框架的核心,实现了所有并发算法,用于生产者和消费者之间快速、正确地传递数据,其有两个实现类`SingleProducerSequencer`和`MultiProducerSequencer`。 + +**Sequence**:`Sequence`被用来标识`Ring Buffer`和消费者`Event Processor`的处理进度,每个消费者`Event Processor`和`Ring Buffer`本身都分别维护了一个`Sequence`,支持并发操作和顺序写,其也通过填充缓存行的方式来消除伪共享从而提高性能。 + +**Sequence Barrier**:`Sequence Barrier`即为序号屏障,通过追踪生产者的`cursorSequence`和每个消费者(` EventProcessor`)的`sequence`的方式来协调生产者和消费者之间的数据交换进度,其实现类`ProcessingSequenceBarrier`持有的`WaitStrategy`等待策略类是实现序号屏障的核心。 + +**Wait Strategy**:`Wait Strategy`是决定消费者如何等待生产者的策略方式,当消费者消费速度过快时,此时是不是要让消费者等待下,此时消费者等待是通过锁的方式实现还是无锁的方式实现呢? + +**Event Processor**:`Event Processor`可以理解为消费者线程,该线程会一直从`Ring Buffer`获取数据来消费数据,其有两个核心实现类:`BatchEventProcessor`和`WorkProcessor`。 + +**Event Handler**:`Event Handler`可以理解为消费者实现业务逻辑的`Handler`,被`BatchEventProcessor`类引用,在`BatchEventProcessor`线程的死循环中不断从`Ring Buffer`获取数据供`Event Handler`消费。 + +**Producer**:生产者,一般用`RingBuffer.publishEvent`来生产数据。 + + + + + +# 4 入门DEMO +```java +// LongEvent.java +public class LongEvent +{ + private long value; + + public void set(long value) + { + this.value = value; + } + + public long get() { + return this.value; + } +} +``` + +```java +// LongEventFactory.java +public class LongEventFactory implements EventFactory +{ + @Override + public LongEvent newInstance() + { + return new LongEvent(); + } +} +``` + +```java +// LongEventHandler.java +public class LongEventHandler implements EventHandler +{ + @Override + public void onEvent(LongEvent event, long sequence, boolean endOfBatch) + { + System.out.println(new Date() + ":Event-" + event.get()); + } +} +``` + +```java +// LongEventTranslatorOneArg.java +public class LongEventTranslatorOneArg implements EventTranslatorOneArg { + @Override + public void translateTo(LongEvent event, long sequence, ByteBuffer buffer) { + event.set(buffer.getLong(0)); + } +} +``` + +```java +// LongEventMain.java +public class LongEventMain +{ + public static void main(String[] args) throws Exception + { + int bufferSize = 1024; + final Disruptor disruptor = new Disruptor( + new LongEventFactory(), + bufferSize, + Executors.newSingleThreadExecutor(), + ProducerType.SINGLE, + new YieldingWaitStrategy() + ); + + disruptor.handleEventsWith(new LongEventHandler()); + disruptor.start(); + + + RingBuffer ringBuffer = disruptor.getRingBuffer(); + ByteBuffer bb = ByteBuffer.allocate(8); + for (long l = 0; true; l++) + { + bb.putLong(0, l); + ringBuffer.publishEvent(new LongEventTranslatorOneArg(), bb); + Thread.sleep(1000); + } + } +} +``` +输出结果: + + +![img_1.png](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/Disruptor/images/img_1.png?raw=true) + +参考:https://lmax-exchange.github.io/disruptor/user-guide/index.html + + + +**若您觉得不错,请无情的转发和点赞吧!** + +【源码笔记】Github地址: + +https://github.com/yuanmabiji/Java-SourceCode-Blogs + diff --git "a/JUC/Java346円230円257円345円246円202円344円275円225円345円256円236円347円216円260円Future346円250円241円345円274円217円347円232円204円357円274円237円344円270円207円345円255円227円350円257円246円350円247円243円357円274円201円.md" "b/JUC/Java346円230円257円345円246円202円344円275円225円345円256円236円347円216円260円Future346円250円241円345円274円217円347円232円204円357円274円237円344円270円207円345円255円227円350円257円246円350円247円243円357円274円201円.md" new file mode 100644 index 0000000..81d4a0f --- /dev/null +++ "b/JUC/Java346円230円257円345円246円202円344円275円225円345円256円236円347円216円260円Future346円250円241円345円274円217円347円232円204円357円274円237円344円270円207円345円255円227円350円257円246円350円247円243円357円274円201円.md" @@ -0,0 +1,662 @@ +JDK1.8源码分析项目(中文注释)Github地址: + +https://github.com/yuanmabiji/jdk1.8-sourcecode-blogs +# 1 Future是什么? +先举个例子,我们平时网购买东西,下单后会生成一个订单号,然后商家会根据这个订单号发货,发货后又有一个快递单号,然后快递公司就会根据这个快递单号将网购东西快递给我们。在这一过程中,这一系列的单号都是我们收货的重要凭证。 + +因此,JDK的Future就类似于我们网购买东西的单号,当我们执行某一耗时的任务时,我们可以另起一个线程异步去执行这个耗时的任务,同时我们可以干点其他事情。当事情干完后我们再根据future这个"单号"去提取耗时任务的执行结果即可。因此Future也是多线程中的一种应用模式。 + +> **扩展**: 说起多线程,那么Future又与Thread有什么区别呢?最重要的区别就是Thread是没有返回结果的,而Future模式是有返回结果的。 + +# 2 如何使用Future +前面搞明白了什么是Future,下面我们再来举个简单的例子看看如何使用Future。 + +假如现在我们要打火锅,首先我们要准备两样东西:把水烧开和准备食材。因为烧开水是一个比较漫长的过程(相当于耗时的业务逻辑),因此我们可以一边烧开水(相当于另起一个线程),一边准备火锅食材(主线程),等两者都准备好了我们就可以开始打火锅了。 + +```java +// DaHuoGuo.java + +public class DaHuoGuo { + public static void main(String[] args) throws Exception { + FutureTask futureTask = new FutureTask(new Callable() { + @Override + public String call() throws Exception { + System.out.println(Thread.currentThread().getName() + ":" + "开始烧开水..."); + // 模拟烧开水耗时 + Thread.sleep(2000); + System.out.println(Thread.currentThread().getName() + ":" + "开水已经烧好了..."); + return "开水"; + } + }); + + Thread thread = new Thread(futureTask); + thread.start(); + + // do other thing + System.out.println(Thread.currentThread().getName() + ":" + " 此时开启了一个线程执行future的逻辑(烧开水),此时我们可以干点别的事情(比如准备火锅食材)..."); + // 模拟准备火锅食材耗时 + Thread.sleep(3000); + System.out.println(Thread.currentThread().getName() + ":" + "火锅食材准备好了"); + String shicai = "火锅食材"; + + // 开水已经稍好,我们取得烧好的开水 + String boilWater = futureTask.get(); + + System.out.println(Thread.currentThread().getName() + ":" + boilWater + "和" + shicai + "已经准备好,我们可以开始打火锅啦"); + } +} +``` +执行结果如下截图,符合我们的预期: +![](https://user-gold-cdn.xitu.io/2020/6/21/172d728d9ad5b1f1?w=1008&h=150&f=png&s=25755) + +从以上代码中可以看到,我们使用Future主要有以下步骤: +1. 新建一个`Callable`匿名函数实现类对象,我们的业务逻辑在`Callable`的`call`方法中实现,其中Callable的泛型是返回结果类型; +2. 然后把`Callable`匿名函数对象作为`FutureTask`的构造参数传入,构建一个`futureTask`对象; +3. 然后再把`futureTask`对象作为`Thread`构造参数传入并开启这个线程执行去执行业务逻辑; +4. 最后我们调用`futureTask`对象的`get`方法得到业务逻辑执行结果。 + +可以看到跟Future使用有关的JDK类主要有`FutureTask`和`Callable`两个,下面主要对`FutureTask`进行源码分析。 +> **扩展**: 还有一种使用`Future`的方式是将`Callable`实现类提交给线程池执行的方式,这里不再介绍,自行百度即可。 + +# 3 FutureTask类结构分析 +我们先来看下`FutureTask`的类结构: + +![](https://user-gold-cdn.xitu.io/2020/6/21/172d73948efa22e2?w=299&h=299&f=png&s=8011) +可以看到`FutureTask`实现了`RunnableFuture`接口,而`RunnableFuture`接口又继承了`Future`和`Runnable`接口。因为`FutureTask`间接实现了`Runnable`接口,因此可以作为任务被线程`Thread`执行;此外,**最重要的一点**就是`FutureTask`还间接实现了`Future`接口,因此还可以获得任务执行的结果。下面我们就来简单看看这几个接口的相关`api`。 +```java +// Runnable.java + +@FunctionalInterface +public interface Runnable { + // 执行线程任务 + public abstract void run(); +} +``` +`Runnable`没啥好说的,相信大家都已经很熟悉了。 +```java +// Future.java + +public interface Future { + /** + * 尝试取消线程任务的执行,分为以下几种情况: + * 1)如果线程任务已经完成或已经被取消或其他原因不能被取消,此时会失败并返回false; + * 2)如果任务还未开始执行,此时执行cancel方法,那么任务将被取消执行,此时返回true;TODO 此时对应任务状态state的哪种状态???不懂!! + * 3)如果任务已经开始执行,那么mayInterruptIfRunning这个参数将决定是否取消任务的执行。 + * 这里值得注意的是,cancel(true)实质并不能真正取消线程任务的执行,而是发出一个线程 + * 中断的信号,一般需要结合Thread.currentThread().isInterrupted()来使用。 + */ + boolean cancel(boolean mayInterruptIfRunning); + /** + * 判断任务是否被取消,在执行任务完成前被取消,此时会返回true + */ + boolean isCancelled(); + /** + * 这个方法不管任务正常停止,异常还是任务被取消,总是返回true。 + */ + boolean isDone(); + /** + * 获取任务执行结果,注意是阻塞等待获取任务执行结果。 + */ + V get() throws InterruptedException, ExecutionException; + /** + * 获取任务执行结果,注意是阻塞等待获取任务执行结果。 + * 只不过在规定的时间内未获取到结果,此时会抛出超时异常 + */ + V get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException; +} +``` +`Future`接口象征着异步执行任务的结果即执行一个耗时任务完全可以另起一个线程执行,然后此时我们可以去做其他事情,做完其他事情我们再调用`Future.get()`方法获取结果即可,此时若异步任务还没结束,此时会一直阻塞等待,直到异步任务执行完获取到结果。 + +```java +// RunnableFuture.java + +public interface RunnableFuture extends Runnable, Future { + /** + * Sets this Future to the result of its computation + * unless it has been cancelled. + */ + void run(); +} +``` +`RunnableFuture`是`Future`和`Runnable`接口的组合,即这个接口表示又可以被线程异步执行,因为实现了`Runnable`接口,又可以获得线程异步任务的执行结果,因为实现了`Future`接口。因此解决了`Runnable`异步任务没有返回结果的缺陷。 + +接下来我们来看下`FutureTask`,`FutureTask`实现了`RunnableFuture`接口,因此是`Future`和`Runnable`接口的具体实现类,是一个可被取消的异步线程任务,提供了`Future`的基本实现,即异步任务执行后我们能够获取到异步任务的执行结果,是我们接下来分析的重中之重。`FutureTask`可以包装一个`Callable`和`Runnable`对象,此外,`FutureTask`除了可以被线程执行外,还可以被提交给线程池执行。 + +我们先看下`FutureTask`类的`api`,其中重点方法已经红框框出。 + +![](https://user-gold-cdn.xitu.io/2020/6/25/172e98c0611e7c60?w=389&h=429&f=png&s=27506) +上图中`FutureTask`的`run`方法是被线程异步执行的方法,`get`方法即是取得异步任务执行结果的方法,还有`cancel`方法是取消任务执行的方法。接下来我们主要对这三个方法进行重点分析。 +> **思考**: +1. `FutureTask`覆写的`run`方法的返回类型依然是`void`,表示没有返回值,那么`FutureTask`的`get`方法又是如何获得返回值的呢? +2. `FutureTask`的`cancel`方法能真正取消线程异步任务的执行么?什么情况下能取消? + +因为`FutureTask`异步任务执行结果还跟`Callable`接口有关,因此我们再来看下`Callable`接口: +```java +// Callable.java + +@FunctionalInterface +public interface Callable { + /** + * Computes a result, or throws an exception if unable to do so. + */ + V call() throws Exception; +} +``` +我们都知道,`Callable`接口和`Runnable`接口都可以被提交给线程池执行,唯一不同的就是`Callable`接口是有返回结果的,其中的泛型`V`就是返回结果,而`Runnable`接口是没有返回结果的。 +> **思考**: 一般情况下,`Runnable`接口实现类才能被提交给线程池执行,为何`Callable`接口实现类也可以被提交给线程池执行?想想线程池的`submit`方法内部有对`Callable`做适配么? + +# 4 FutureTask源码分析 + +## 4.1 FutureTask成员变量 +我们首先来看下`FutureTask`的成员变量有哪些,理解这些成员变量对后面的源码分析非常重要。 +```java +// FutureTask.java + +/** 封装的Callable对象,其call方法用来执行异步任务 */ +private Callable callable; +/** 在FutureTask里面定义一个成员变量outcome,用来装异步任务的执行结果 */ +private Object outcome; // non-volatile, protected by state reads/writes +/** 用来执行callable任务的线程 */ +private volatile Thread runner; +/** 线程等待节点,reiber stack的一种实现 */ +private volatile WaitNode waiters; +/** 任务执行状态 */ +private volatile int state; + +// Unsafe mechanics +private static final sun.misc.Unsafe UNSAFE; +// 对应成员变量state的偏移地址 +private static final long stateOffset; +// 对应成员变量runner的偏移地址 +private static final long runnerOffset; +// 对应成员变量waiters的偏移地址 +private static final long waitersOffset; +``` +这里我们要重点关注下`FutureTask`的`Callable`成员变量,因为`FutureTask`的异步任务最终是委托给`Callable`去实现的。 + +> **思考**: +1. `FutureTask`的成员变量`runner`,`waiters`和`state`都被`volatile`修饰,我们可以思考下为什么这三个成员变量需要被`volatile`修饰,而其他成员变量又不用呢?`volatile`关键字的作用又是什么呢? +2. 既然已经定义了成员变量`runner`,`waiters`和`state`了,此时又定义了`stateOffset`,`runnerOffset`和`waitersOffset`变量分别对应`runner`,`waiters`和`state`的偏移地址,为何要多此一举呢? + +我们再来看看`stateOffset`,`runnerOffset`和`waitersOffset`变量这三个变量的初始化过程: +```java +// FutureTask.java + +static { + try { + UNSAFE = sun.misc.Unsafe.getUnsafe(); + Class k = FutureTask.class; + stateOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("state")); + runnerOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("runner")); + waitersOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("waiters")); + } catch (Exception e) { + throw new Error(e); + } + } +``` +## 4.2 FutureTask的状态变化 +前面讲了`FutureTask`的成员变量,有一个表示**状态**的成员变量`state`我们要重点关注下,`state`变量表示任务执行的状态。 +```java +// FutureTask.java + +/** 任务执行状态 */ +private volatile int state; +/** 任务新建状态 */ +private static final int NEW = 0; +/** 任务正在完成状态,是一个瞬间过渡状态 */ +private static final int COMPLETING = 1; +/** 任务正常结束状态 */ +private static final int NORMAL = 2; +/** 任务执行异常状态 */ +private static final int EXCEPTIONAL = 3; +/** 任务被取消状态,对应cancel(false) */ +private static final int CANCELLED = 4; +/** 任务中断状态,是一个瞬间过渡状态 */ +private static final int INTERRUPTING = 5; +/** 任务被中断状态,对应cancel(true) */ +private static final int INTERRUPTED = 6; +``` +可以看到任务状态变量`state`有以上7种状态,0-6分别对应着每一种状态。任务状态一开始是`NEW`,然后由`FutureTask`的三个方法`set`,`setException`和`cancel`来设置状态的变化,其中状态变化有以下四种情况: + +1. `NEW -> COMPLETING -> NORMAL`:这个状态变化表示异步任务的正常结束,其中`COMPLETING`是一个瞬间临时的过渡状态,由`set`方法设置状态的变化; +2. `NEW -> COMPLETING -> EXCEPTIONAL`:这个状态变化表示异步任务执行过程中抛出异常,由`setException`方法设置状态的变化; +3. `NEW -> CANCELLED`:这个状态变化表示被取消,即调用了`cancel(false)`,由`cancel`方法来设置状态变化; +4. `NEW -> INTERRUPTING -> INTERRUPTED`:这个状态变化表示被中断,即调用了`cancel(true)`,由`cancel`方法来设置状态变化。 + + + + + +## 4.3 FutureTask构造函数 +`FutureTask`有两个构造函数,我们分别来看看: +```java +// FutureTask.java + +// 第一个构造函数 +public FutureTask(Callable callable) { + if (callable == null) + throw new NullPointerException(); + this.callable = callable; + this.state = NEW; // ensure visibility of callable +} +``` +可以看到,这个构造函数在我们前面举的"打火锅"的例子代码中有用到,就是`Callable`成员变量赋值,在异步执行任务时再调用`Callable.call`方法执行异步任务逻辑。此外,此时给任务状态`state`赋值为`NEW`,表示任务新建状态。 + +我们再来看下`FutureTask`的另外一个构造函数: +```java +// FutureTask.java + +// 另一个构造函数 +public FutureTask(Runnable runnable, V result) { + this.callable = Executors.callable(runnable, result); + this.state = NEW; // ensure visibility of callable +} +``` +这个构造函数在执行`Executors.callable(runnable, result)`时是通过适配器`RunnableAdapter`来将`Runnable`对象`runnable`转换成`Callable`对象,然后再分别给`callable`和`state`变量赋值。 + +**注意**,这里我们需要记住的是`FutureTask`新建时,此时的任务状态`state`是`NEW`就好了。 +## 4.4 FutureTask.run方法,用来执行异步任务 +前面我们有讲到`FutureTask`间接实现了`Runnable`接口,覆写了`Runnable`接口的`run`方法,因此该覆写的`run`方法是提交给线程来执行的,同时,该`run`方法正是执行异步任务逻辑的方法,那么,执行完`run`方法又是如何保存异步任务执行的结果的呢? + +我们现在着重来分析下`run`方法: +```java +// FutureTask.java + +public void run() { + // 【1】,为了防止多线程并发执行异步任务,这里需要判断线程满不满足执行异步任务的条件,有以下三种情况: + // 1)若任务状态state为NEW且runner为null,说明还未有线程执行过异步任务,此时满足执行异步任务的条件, + // 此时同时调用CAS方法为成员变量runner设置当前线程的值; + // 2)若任务状态state为NEW且runner不为null,任务状态虽为NEW但runner不为null,说明有线程正在执行异步任务, + // 此时不满足执行异步任务的条件,直接返回; + // 1)若任务状态state不为NEW,此时不管runner是否为null,说明已经有线程执行过异步任务,此时没必要再重新 + // 执行一次异步任务,此时不满足执行异步任务的条件; + if (state != NEW || + !UNSAFE.compareAndSwapObject(this, runnerOffset, + null, Thread.currentThread())) + return; + try { + // 拿到之前构造函数传进来的callable实现类对象,其call方法封装了异步任务执行的逻辑 + Callable c = callable; + // 若任务还是新建状态的话,那么就调用异步任务 + if (c != null && state == NEW) { + // 异步任务执行结果 + V result; + // 异步任务执行成功还是始遍标志 + boolean ran; + try { + // 【2】,执行异步任务逻辑,并把执行结果赋值给result + result = c.call(); + // 若异步任务执行过程中没有抛出异常,说明异步任务执行成功,此时设置ran标志为true + ran = true; + } catch (Throwable ex) { + result = null; + // 异步任务执行过程抛出异常,此时设置ran标志为false + ran = false; + // 【3】设置异常,里面也设置state状态的变化 + setException(ex); + } + // 【3】若异步任务执行成功,此时设置异步任务执行结果,同时也设置状态的变化 + if (ran) + set(result); + } + } finally { + // runner must be non-null until state is settled to + // prevent concurrent calls to run() + // 异步任务正在执行过程中,runner一直是非空的,防止并发调用run方法,前面有调用cas方法做判断的 + // 在异步任务执行完后,不管是正常结束还是异常结束,此时设置runner为null + runner = null; + // state must be re-read after nulling runner to prevent + // leaked interrupts + // 线程执行异步任务后的任务状态 + int s = state; + // 【4】如果执行了cancel(true)方法,此时满足条件, + // 此时调用handlePossibleCancellationInterrupt方法处理中断 + if (s>= INTERRUPTING) + handlePossibleCancellationInterrupt(s); + } +} +``` +可以看到执行异步任务的`run`方法主要分为以下四步来执行: +1. **判断线程是否满足执行异步任务的条件**:为了防止多线程并发执行异步任务,这里需要判断线程满不满足执行异步任务的条件; +2. **若满足条件,执行异步任务**:因为异步任务逻辑封装在`Callable.call`方法中,此时直接调用`Callable.call`方法执行异步任务,然后返回执行结果; +3. **根据异步任务的执行情况做不同的处理**:1) 若异步任务执行正常结束,此时调用`set(result);`来设置任务执行结果;2)若异步任务执行抛出异常,此时调用`setException(ex);`来设置异常,详细分析请见`4.4.1小节`; +4. **异步任务执行完后的善后处理工作**:不管异步任务执行成功还是失败,若其他线程有调用`FutureTask.cancel(true)`,此时需要调用`handlePossibleCancellationInterrupt`方法处理中断,详细分析请见`4.4.2小节`。 + +这里**值得注意**的是判断线程满不满足执行异步任务条件时,`runner`是否为`null`是调用`UNSAFE`的`CAS`方法`compareAndSwapObject`来判断和设置的,同时`compareAndSwapObject`是通过成员变量`runner`的偏移地址`runnerOffset`来给`runner`赋值的,此外,成员变量`runner`被修饰为`volatile`是在多线程的情况下, 一个线程的`volatile`修饰变量的设值能够立即刷进主存,因此值便可被其他线程可见。 + +### 4.4.1 FutureTask的set和setException方法 +下面我们来看下当异步任务执行正常结束时,此时会调用`set(result);`方法: +```java +// FutureTask.java + +protected void set(V v) { + // 【1】调用UNSAFE的CAS方法判断任务当前状态是否为NEW,若为NEW,则设置任务状态为COMPLETING + // 【思考】此时任务不能被多线程并发执行,什么情况下会导致任务状态不为NEW? + // 答案是只有在调用了cancel方法的时候,此时任务状态不为NEW,此时什么都不需要做, + // 因此需要调用CAS方法来做判断任务状态是否为NEW + if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { + // 【2】将任务执行结果赋值给成员变量outcome + outcome = v; + // 【3】将任务状态设置为NORMAL,表示任务正常结束 + UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state + // 【4】调用任务执行完成方法,此时会唤醒阻塞的线程,调用done()方法和清空等待线程链表等 + finishCompletion(); + } +} +``` +可以看到当异步任务正常执行结束后,且异步任务没有被`cancel`的情况下,此时会做以下事情:将任务执行结果保存到`FutureTask`的成员变量`outcome`中的,赋值结束后会调用`finishCompletion`方法来唤醒阻塞的线程(哪里来的阻塞线程?后面会分析),**值得注意**的是这里对应的任务状态变化是**NEW -> COMPLETING -> NORMAL**。 + + +我们继续来看下当异步任务执行过程中抛出异常,此时会调用`setException(ex);`方法。 +```java +// FutureTask.java + +protected void setException(Throwable t) { + // 【1】调用UNSAFE的CAS方法判断任务当前状态是否为NEW,若为NEW,则设置任务状态为COMPLETING + // 【思考】此时任务不能被多线程并发执行,什么情况下会导致任务状态不为NEW? + // 答案是只有在调用了cancel方法的时候,此时任务状态不为NEW,此时什么都不需要做, + // 因此需要调用CAS方法来做判断任务状态是否为NEW + if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { + // 【2】将异常赋值给成员变量outcome + outcome = t; + // 【3】将任务状态设置为EXCEPTIONAL + UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state + // 【4】调用任务执行完成方法,此时会唤醒阻塞的线程,调用done()方法和清空等待线程链表等 + finishCompletion(); + } +} +``` +可以看到`setException(Throwable t)`的代码逻辑跟前面的`set(V v)`几乎一样,不同的是任务执行过程中抛出异常,此时是将异常保存到`FutureTask`的成员变量`outcome`中,还有,**值得注意**的是这里对应的任务状态变化是**NEW -> COMPLETING -> EXCEPTIONAL**。 + +因为异步任务不管正常还是异常结束,此时都会调用`FutureTask`的`finishCompletion`方法来唤醒唤醒阻塞的线程,这里阻塞的线程是指我们调用`Future.get`方法时若异步任务还未执行完,此时该线程会阻塞。 +```java +// FutureTask.java + +private void finishCompletion() { + // assert state> COMPLETING; + // 取出等待线程链表头节点,判断头节点是否为null + // 1)若线程链表头节点不为空,此时以"后进先出"的顺序(栈)移除等待的线程WaitNode节点 + // 2)若线程链表头节点为空,说明还没有线程调用Future.get()方法来获取任务执行结果,固然不用移除 + for (WaitNode q; (q = waiters) != null;) { + // 调用UNSAFE的CAS方法将成员变量waiters设置为空 + if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { + for (;;) { + // 取出WaitNode节点的线程 + Thread t = q.thread; + // 若取出的线程不为null,则将该WaitNode节点线程置空,且唤醒正在阻塞的该线程 + if (t != null) { + q.thread = null; + //【重要】唤醒正在阻塞的该线程 + LockSupport.unpark(t); + } + // 继续取得下一个WaitNode线程节点 + WaitNode next = q.next; + // 若没有下一个WaitNode线程节点,说明已经将所有等待的线程唤醒,此时跳出for循环 + if (next == null) + break; + // 将已经移除的线程WaitNode节点的next指针置空,此时好被垃圾回收 + q.next = null; // unlink to help gc + // 再把下一个WaitNode线程节点置为当前线程WaitNode头节点 + q = next; + } + break; + } + } + // 不管任务正常执行还是抛出异常,都会调用done方法 + done(); + // 因为异步任务已经执行完且结果已经保存到outcome中,因此此时可以将callable对象置空了 + callable = null; // to reduce footprint +} +``` +`finishCompletion`方法的作用就是不管异步任务正常还是异常结束,此时都要唤醒且移除线程等待链表的等待线程节点,这个链表实现的是一个是`Treiber stack`,因此唤醒(移除)的顺序是"后进先出"即后面先来的线程先被先唤醒(移除),关于这个线程等待链表是如何成链的,后面再继续分析。 + +### 4.4.2 FutureTask的handlePossibleCancellationInterrupt方法 +在`4.4小节`分析的`run`方法里的最后有一个`finally`块,此时若任务状态`state>= INTERRUPTING`,此时说明有其他线程执行了`cancel(true)`方法,此时需要让出`CPU`执行的时间片段给其他线程执行,我们来看下具体的源码: +```java +// FutureTask.java + +private void handlePossibleCancellationInterrupt(int s) { + // It is possible for our interrupter to stall before getting a + // chance to interrupt us. Let's spin-wait patiently. + // 当任务状态是INTERRUPTING时,此时让出CPU执行的机会,让其他线程执行 + if (s == INTERRUPTING) + while (state == INTERRUPTING) + Thread.yield(); // wait out pending interrupt + + // assert state == INTERRUPTED; + + // We want to clear any interrupt we may have received from + // cancel(true). However, it is permissible to use interrupts + // as an independent mechanism for a task to communicate with + // its caller, and there is no way to clear only the + // cancellation interrupt. + // + // Thread.interrupted(); +} +``` +> **思考**: 为啥任务状态是`INTERRUPTING`时,此时就要让出CPU执行的时间片段呢?还有为什么要在义务任务执行后才调用`handlePossibleCancellationInterrupt`方法呢? + +## 4.5 FutureTask.get方法,获取任务执行结果 + +```java +前面我们起一个线程在其`run`方法中执行异步任务后,此时我们可以调用`FutureTask.get`方法来获取异步任务执行的结果。 + +// FutureTask.java + +public V get() throws InterruptedException, ExecutionException { + int s = state; + // 【1】若任务状态<=completing,说明任务正在执行过程中,此时可能正常结束,也可能遇到异常 + if (s <= COMPLETING) + s = awaitDone(false, 0L); + // 【2】最后根据任务状态来返回任务执行结果,此时有三种情况:1)任务正常执行;2)任务执行异常;3)任务被取消 + return report(s); +} +``` +可以看到,如果任务状态`state<=completing`,说明异步任务正在执行过程中,此时会调用`awaitdone`方法阻塞等待;当任务执行完后,此时再调用`report`方法来报告任务结果,此时有三种情况:1)任务正常执行;2)任务执行异常;3)任务被取消。 + +### 4.5.1 FutureTask.awaitDone方法 +`FutureTask.awaitDone`方法会阻塞获取异步任务执行结果的当前线程,直到异步任务执行完成。 +```java +// FutureTask.java + +private int awaitDone(boolean timed, long nanos) + throws InterruptedException { + // 计算超时结束时间 + final long deadline = timed ? System.nanoTime() + nanos : 0L; + // 线程链表头节点 + WaitNode q = null; + // 是否入队 + boolean queued = false; + // 死循环 + for (;;) { + // 如果当前获取任务执行结果的线程被中断,此时移除该线程WaitNode链表节点,并抛出InterruptedException + if (Thread.interrupted()) { + removeWaiter(q); + throw new InterruptedException(); + } + + int s = state; + // 【5】如果任务状态>COMPLETING,此时返回任务执行结果,其中此时任务可能正常结束(NORMAL),可能抛出异常(EXCEPTIONAL) + // 或任务被取消(CANCELLED,INTERRUPTING或INTERRUPTED状态的一种) + if (s> COMPLETING) { + // 【问】此时将当前WaitNode节点的线程置空,其中在任务结束时也会调用finishCompletion将WaitNode节点的thread置空, + // 这里为什么又要再调用一次q.thread = null;呢? + // 【答】因为若很多线程来获取任务执行结果,在任务执行完的那一刻,此时获取任务的线程要么已经在线程等待链表中,要么 + // 此时还是一个孤立的WaitNode节点。在线程等待链表中的的所有WaitNode节点将由finishCompletion来移除(同时唤醒)所有 + // 等待的WaitNode节点,以便垃圾回收;而孤立的线程WaitNode节点此时还未阻塞,因此不需要被唤醒,此时只要把其属性置为 + // null,然后其有没有被谁引用,因此可以被GC。 + if (q != null) + q.thread = null; + // 【重要】返回任务执行结果 + return s; + } + // 【4】若任务状态为COMPLETING,此时说明任务正在执行过程中,此时获取任务结果的线程需让出CPU执行时间片段 + else if (s == COMPLETING) // cannot time out yet + Thread.yield(); + // 【1】若当前线程还没有进入线程等待链表的WaitNode节点,此时新建一个WaitNode节点,并把当前线程赋值给WaitNode节点的thread属性 + else if (q == null) + q = new WaitNode(); + // 【2】若当前线程等待节点还未入线程等待队列,此时加入到该线程等待队列的头部 + else if (!queued) + queued = UNSAFE.compareAndSwapObject(this, waitersOffset, + q.next = waiters, q); + // 若有超时设置,那么处理超时获取任务结果的逻辑 + else if (timed) { + nanos = deadline - System.nanoTime(); + if (nanos <= 0L) { + removeWaiter(q); + return state; + } + LockSupport.parkNanos(this, nanos); + } + // 【3】若没有超时设置,此时直接阻塞当前线程 + else + LockSupport.park(this); + } +} +``` +`FutureTask.awaitDone`方法主要做的事情总结如下: + +0. 首先`awaitDone`方法里面是一个死循环; +1. 若获取结果的当前线程被其他线程中断,此时移除该线程WaitNode链表节点,并抛出InterruptedException; +2. 如果任务状态`state>COMPLETING`,此时返回任务执行结果; +3. 若任务状态为`COMPLETING`,此时获取任务结果的线程需让出CPU执行时间片段; +4. 若`q == null`,说明当前线程还未设置到`WaitNode`节点,此时新建`WaitNode`节点并设置其`thread`属性为当前线程; +5. 若`queued==false`,说明当前线程`WaitNode`节点还未加入线程等待链表,此时加入该链表的头部; +6. 当`timed`设置为true时,此时该方法具有超时功能,关于超时的逻辑这里不详细分析; +7. 当前面6个条件都不满足时,此时阻塞当前线程。 + +我们分析到这里,可以直到执行异步任务只能有一个线程来执行,而获取异步任务结果可以多线程来获取,当异步任务还未执行完时,此时获取异步任务结果的线程会加入线程等待链表中,然后调用调用`LockSupport.park(this);`方法阻塞当前线程。直到异步任务执行完成,此时会调用`finishCompletion`方法来唤醒并移除线程等待链表的每个`WaitNode`节点,这里这里唤醒(移除)`WaitNode`节点的线程是从链表头部开始的,前面我们也已经分析过。 + +还有一个特别需要注意的就是`awaitDone`方法里面是一个死循环,当一个获取异步任务的线程进来后可能会多次进入多个条件分支执行不同的业务逻辑,也可能只进入一个条件分支。下面分别举两种可能的情况进行说明: + +**情况1**: +当获取异步任务结果的线程进来时,此时异步任务还未执行完即`state=NEW`且没有超时设置时: +1. **第一次循环**:此时`q = null`,此时进入上面代码标号`【1】`的判断分支,即为当前线程新建一个`WaitNode`节点; +2. **第二次循环**:此时`queued = false`,此时进入上面代码标号`【2】`的判断分支,即将之前新建的`WaitNode`节点加入线程等待链表中; +3. **第三次循环**:此时进入上面代码标号`【3】`的判断分支,即阻塞当前线程; +4. **第四次循环**:加入此时异步任务已经执行完,此时进入上面代码标号`【5】`的判断分支,即返回异步任务执行结果。 + +**情况2**: +当获取异步任务结果的线程进来时,此时异步任务已经执行完即`state>COMPLETING`且没有超时设置时,此时直接进入上面代码标号`【5】`的判断分支,即直接返回异步任务执行结果即可,也不用加入线程等待链表了。 +### 4.5.2 FutureTask.report方法 +在`get`方法中,当异步任务执行结束后即不管异步任务正常还是异常结束,亦或是被`cancel`,此时获取异步任务结果的线程都会被唤醒,因此会继续执行`FutureTask.report`方法报告异步任务的执行情况,此时可能会返回结果,也可能会抛出异常。 +```java +// FutureTask.java + +private V report(int s) throws ExecutionException { + // 将异步任务执行结果赋值给x,此时FutureTask的成员变量outcome要么保存着 + // 异步任务正常执行的结果,要么保存着异步任务执行过程中抛出的异常 + Object x = outcome; + // 【1】若异步任务正常执行结束,此时返回异步任务执行结果即可 + if (s == NORMAL) + return (V)x; + // 【2】若异步任务执行过程中,其他线程执行过cancel方法,此时抛出CancellationException异常 + if (s>= CANCELLED) + throw new CancellationException(); + // 【3】若异步任务执行过程中,抛出异常,此时将该异常转换成ExecutionException后,重新抛出。 + throw new ExecutionException((Throwable)x); +} +``` +## 4.6 FutureTask.cancel方法,取消执行任务 +我们最后再来看下`FutureTask.cancel`方法,我们一看到`FutureTask.cancel`方法,肯定一开始就天真的认为这是一个可以取消异步任务执行的方法,如果我们这样认为的话,只能说我们猜对了一半。 +```java +// FutureTask.java + +public boolean cancel(boolean mayInterruptIfRunning) { + // 【1】判断当前任务状态,若state == NEW时根据mayInterruptIfRunning参数值给当前任务状态赋值为INTERRUPTING或CANCELLED + // a)当任务状态不为NEW时,说明异步任务已经完成,或抛出异常,或已经被取消,此时直接返回false。 + // TODO 【问题】此时若state = COMPLETING呢?此时为何也直接返回false,而不能发出中断异步任务线程的中断信号呢?? + // TODO 仅仅因为COMPLETING是一个瞬时态吗??? + // b)当前仅当任务状态为NEW时,此时若mayInterruptIfRunning为true,此时任务状态赋值为INTERRUPTING;否则赋值为CANCELLED。 + if (!(state == NEW && + UNSAFE.compareAndSwapInt(this, stateOffset, NEW, + mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) + return false; + try { // in case call to interrupt throws exception + // 【2】如果mayInterruptIfRunning为true,此时中断执行异步任务的线程runner(还记得执行异步任务时就把执行异步任务的线程就赋值给了runner成员变量吗) + if (mayInterruptIfRunning) { + try { + Thread t = runner; + if (t != null) + // 中断执行异步任务的线程runner + t.interrupt(); + } finally { // final state + // 最后任务状态赋值为INTERRUPTED + UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); + } + } + // 【3】不管mayInterruptIfRunning为true还是false,此时都要调用finishCompletion方法唤醒阻塞的获取异步任务结果的线程并移除线程等待链表节点 + } finally { + finishCompletion(); + } + // 返回true + return true; +} +``` +以上代码中,当异步任务状态`state != NEW`时,说明异步任务已经正常执行完或已经异常结束亦或已经被`cancel`,此时直接返回`false`;当异步任务状态`state = NEW`时,此时又根据`mayInterruptIfRunning`参数是否为`true`分为以下两种情况: + +1. 当`mayInterruptIfRunning = false`时,此时任务状态`state`直接被赋值为`CANCELLED`,此时不会对执行异步任务的线程发出中断信号,**值得注意**的是这里对应的任务状态变化是**NEW -> CANCELLED**。 +2. 当`mayInterruptIfRunning = true`时,此时会对执行异步任务的线程发出中断信号,**值得注意**的是这里对应的任务状态变化是**NEW -> INTERRUPTING -> INTERRUPTED**。 + +最后不管`mayInterruptIfRunning`为`true`还是`false`,此时都要调用`finishCompletion`方法唤醒阻塞的获取异步任务结果的线程并移除线程等待链表节点。 + +从`FutureTask.cancel`源码中我们可以得出答案,该方法并不能真正中断正在执行异步任务的线程,只能对执行异步任务的线程发出中断信号。如果执行异步任务的线程处于`sleep`、`wait`或`join`的状态中,此时会抛出`InterruptedException`异常,该线程可以被中断;此外,如果异步任务需要在`while`循环执行的话,此时可以结合以下代码来结束异步任务线程,即执行异步任务的线程被中断时,此时`Thread.currentThread().isInterrupted()`返回`true`,不满足`while`循环条件因此退出循环,结束异步任务执行线程,如下代码: +``` +public Integer call() throws Exception { + while (!Thread.currentThread().isInterrupted()) { + // 业务逻辑代码 + System.out.println("running..."); + + } + return 666; +} +``` + +**注意**:调用了`FutureTask.cancel`方法,只要返回结果是`true`,假如异步任务线程虽然不能被中断,即使异步任务线程正常执行完毕,返回了执行结果,此时调用`FutureTask.get`方法也不能够获取异步任务执行结果,此时会抛出`CancellationException`异常。请问知道这是为什么吗? + +因为调用了`FutureTask.cancel`方法,只要返回结果是`true`,此时的任务状态为`CANCELLED`或`INTERRUPTED`,同时必然会执行`finishCompletion`方法,而`finishCompletion`方法会唤醒获取异步任务结果的线程等待列表的线程,而获取异步任务结果的线程唤醒后发现状态`s>= CANCELLED`,此时就会抛出`CancellationException`异常了。 + +# 5 总结 + +好了,本篇文章对`FutureTask`的源码分析就到此结束了,下面我们再总结下`FutureTask`的实现逻辑: +1. 我们实现`Callable`接口,在覆写的`call`方法中定义需要执行的业务逻辑; +2. 然后把我们实现的`Callable`接口实现对象传给`FutureTask`,然后`FutureTask`作为异步任务提交给线程执行; +3. 最重要的是`FutureTask`内部维护了一个状态`state`,任何操作(异步任务正常结束与否还是被取消)都是围绕着这个状态进行,并随时更新`state`任务的状态; +4. 只能有一个线程执行异步任务,当异步任务执行结束后,此时可能正常结束,异常结束或被取消。 +5. 可以多个线程并发获取异步任务执行结果,当异步任务还未执行完,此时获取异步任务的线程将加入线程等待列表进行等待; +6. 当异步任务线程执行结束后,此时会唤醒获取异步任务执行结果的线程,注意唤醒顺序是"后进先出"即后面加入的阻塞线程先被唤醒。 +7. 当我们调用`FutureTask.cancel`方法时并不能真正停止执行异步任务的线程,只是发出中断线程的信号。但是只要`cancel`方法返回`true`,此时即使异步任务能正常执行完,此时我们调用`get`方法获取结果时依然会抛出`CancellationException`异常。 + +> **扩展**: 前面我们提到了`FutureTask`的`runner`,`waiters`和`state`都是用`volatile`关键字修饰,说明这三个变量都是多线程共享的对象(成员变量),会被多线程操作,此时用`volatile`关键字修饰是为了一个线程操作`volatile`属性变量值后,能够及时对其他线程可见。此时多线程操作成员变量仅仅用了`volatile`关键字仍然会有线程安全问题的,而此时Doug Lea老爷子没有引入任何线程锁,而是采用了`Unsafe`的`CAS`方法来代替锁操作,确保线程安全性。 + +# 6 分析FutureTask源码,我们能学到什么? + +我们分析源码的目的是什么?除了弄懂`FutureTask`的内部实现原理外,我们还要借鉴大佬写写框架源码的各种技巧,只有这样,我们才能成长。 + +分析了`FutureTask`源码,我们可以从中学到: +1. 利用`LockSupport`来实现线程的阻塞\唤醒机制; +2. 利用`volatile`和`UNSAFE`的`CAS`方法来实现线程共享变量的无锁化操作; +3. 若要编写超时异常的逻辑可以参考`FutureTask`的`get(long timeout, TimeUnit unit)`的实现逻辑; +4. 多线程获取某一成员变量结果时若需要等待时的线程等待链表的逻辑实现; +5. 某一异步任务在某一时刻只能由单一线程执行的逻辑实现; +6. `FutureTask`中的任务状态`satate`的变化处理的逻辑实现。 +7. ... + +以上列举的几点都是我们可以学习参考的地方。 + + +**若您觉得不错,请无情的转发和点赞吧!** + +【源码笔记】Github地址: + +https://github.com/yuanmabiji/Java-SourceCode-Blogs + +------------------------------------------------------------------------------- +公众号【源码笔记】,专注于Java后端系列框架的源码分析。 + +![](https://user-gold-cdn.xitu.io/2020/6/26/172ec7a63ebdf26f?w=498&h=143&f=png&s=23420) \ No newline at end of file diff --git a/JUC/README.md b/JUC/README.md new file mode 100644 index 0000000..76e64ae --- /dev/null +++ b/JUC/README.md @@ -0,0 +1,16 @@ + +【**源码笔记**】专注于Java后端系列框架的源码分析。若觉得源码分析文章不错,欢迎Star哦。 + + +================**JUC源码专题持续更新中...**==================== + +#### 目录 + +1. [Java是如何实现Future模式的?万字详解!](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/JUC/Java%E6%98%AF%E5%A6%82%E4%BD%95%E5%AE%9E%E7%8E%B0Future%E6%A8%A1%E5%BC%8F%E7%9A%84%EF%BC%9F%E4%B8%87%E5%AD%97%E8%AF%A6%E8%A7%A3%EF%BC%81.md) +2. 持续更新中... + +* 更多源码分析文章请跳转至:https://github.com/yuanmabiji/Java-SourceCode-Blogs + + + + diff --git a/README.md b/README.md index 4f3d71b..a8057aa 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,12 @@ 【**源码笔记**】专注于Java后端系列框架的源码分析。若觉得源码分析文章不错,欢迎Star哦。 -由于【源码笔记】今年2月初才开始写源码分析文章,因此目前源码分析文章还不是很多,计划每周持续推出一到两篇Java后端框架源码系列的文章,随着时间的积累,Java后端源码分析文章肯定会越来越多,越来越丰富哦,敬请关注。 +公众号: + +![img_1.png](https://common-ymbj.oss-cn-beijing.aliyuncs.com/wxgzh_qrcode.PNG) + +**温馨提示**:github上前期文章图片失效了,如果想阅读前期文章的话还请移步公众号阅读哦。 + +【源码笔记】计划每周持续推出一篇Java后端框架源码系列的文章,随着时间的积累,Java后端源码分析文章肯定会越来越多,越来越丰富哦,敬请关注。 ### 目录 @@ -8,6 +14,19 @@ 1. [跟大家聊聊我们为什么要学习源码?学习源码对我们有用吗?](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/%E8%B7%9F%E5%A4%A7%E5%AE%B6%E8%81%8A%E8%81%8A%E6%88%91%E4%BB%AC%E4%B8%BA%E4%BB%80%E4%B9%88%E8%A6%81%E5%AD%A6%E4%B9%A0%E6%BA%90%E7%A0%81%EF%BC%9F%E5%AD%A6%E4%B9%A0%E6%BA%90%E7%A0%81%E5%AF%B9%E6%88%91%E4%BB%AC%E6%9C%89%E7%94%A8%E5%90%97%EF%BC%9F.md) 2. [分析开源项目源码,我们该如何入手分析?(授人以渔)](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/%E5%88%86%E6%9E%90%E5%BC%80%E6%BA%90%E9%A1%B9%E7%9B%AE%E6%BA%90%E7%A0%81%EF%BC%8C%E6%88%91%E4%BB%AC%E8%AF%A5%E5%A6%82%E4%BD%95%E5%85%A5%E6%89%8B%E5%88%86%E6%9E%90%EF%BC%9F%EF%BC%88%E6%8E%88%E4%BA%BA%E4%BB%A5%E6%B8%94%EF%BC%89.md) +================**Disruptor源码专题持续更新中...**==================== +1. [初识Disruptor框架!](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/Disruptor/初识Disruptor框架.md) +2. [Disruptor广播模式与执行顺序链源码分析](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/Disruptor/Disruptor广播模式与执行顺序链源码分析.md) +3. 持续更新中... +* Disruptor源码分析专题:https://github.com/yuanmabiji/Java-SourceCode-Blogs/tree/master/Disruptor +* Disruptor源码解析项目(带中文注释):https://github.com/yuanmabiji/disruptor + +================**JUC源码专题持续更新中...**==================== +1. [Java是如何实现Future模式的?万字详解!](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/JUC/Java%E6%98%AF%E5%A6%82%E4%BD%95%E5%AE%9E%E7%8E%B0Future%E6%A8%A1%E5%BC%8F%E7%9A%84%EF%BC%9F%E4%B8%87%E5%AD%97%E8%AF%A6%E8%A7%A3%EF%BC%81.md) +2. 持续更新中... +* JUC源码分析专题:https://github.com/yuanmabiji/Java-SourceCode-Blogs/tree/master/JUC +* JUC源码解析项目(带中文注释):https://github.com/yuanmabiji/jdk1.8-sourcecode-blogs + ================**SpringBoot源码专题持续更新中...**==================== 1. [如何搭建自己的SpringBoot源码调试环境? SpringBoot源码(一)](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/SpringBoot/1%20%E5%A6%82%E4%BD%95%E6%90%AD%E5%BB%BA%E8%87%AA%E5%B7%B1%E7%9A%84SpringBoot%E6%BA%90%E7%A0%81%E8%B0%83%E8%AF%95%E7%8E%AF%E5%A2%83%EF%BC%9F%20%20SpringBoot%E6%BA%90%E7%A0%81%EF%BC%88%E4%B8%80%EF%BC%89.md) 2. [如何分析SpringBoot源码模块及结构? SpringBoot源码(二)](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/SpringBoot/2%20%E5%A6%82%E4%BD%95%E5%88%86%E6%9E%90SpringBoot%E6%BA%90%E7%A0%81%E6%A8%A1%E5%9D%97%E5%8F%8A%E7%BB%93%E6%9E%84%EF%BC%9F%20%20SpringBoot%E6%BA%90%E7%A0%81%EF%BC%88%E4%BA%8C%EF%BC%89.md) @@ -18,7 +37,8 @@ 7. [SpringBoot的启动流程是怎样的?SpringBoot源码(七)](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/SpringBoot/7%20SpringBoot%E7%9A%84%E5%90%AF%E5%8A%A8%E6%B5%81%E7%A8%8B%E6%98%AF%E6%80%8E%E6%A0%B7%E7%9A%84%EF%BC%9FSpringBoot%E6%BA%90%E7%A0%81%EF%BC%88%E4%B8%83%EF%BC%89.md) 8. [SpringApplication对象是如何构建的? SpringBoot源码(八)](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/SpringBoot/8%20SpringApplication%E5%AF%B9%E8%B1%A1%E6%98%AF%E5%A6%82%E4%BD%95%E6%9E%84%E5%BB%BA%E7%9A%84%EF%BC%9F%20SpringBoot%E6%BA%90%E7%A0%81%EF%BC%88%E5%85%AB%EF%BC%89.md) 9. [SpringBoot事件监听机制源码分析(上) SpringBoot源码(九)](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/SpringBoot/9%20SpringBoot%E4%BA%8B%E4%BB%B6%E7%9B%91%E5%90%AC%E6%9C%BA%E5%88%B6%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90(%E4%B8%8A)%20SpringBoot%E6%BA%90%E7%A0%81(%E4%B9%9D).md) -10. 持续更新中... +10. [SpringBoot内置生命周期事件详解 SpringBoot源码(十)](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/SpringBoot/10%20SpringBoot%E5%86%85%E7%BD%AE%E7%94%9F%E5%91%BD%E5%91%A8%E6%9C%9F%E4%BA%8B%E4%BB%B6%E8%AF%A6%E8%A7%A3%20%20SpringBoot%E6%BA%90%E7%A0%81(%E5%8D%81).md) +11. 持续更新中... * SpringBoot源码分析专题:https://github.com/yuanmabiji/Java-SourceCode-Blogs/tree/master/SpringBoot * SpringBoot源码解析项目(带中文注释):https://github.com/yuanmabiji/spring-boot-2.1.0.RELEASE @@ -35,7 +55,8 @@ 1. [Java是如何实现自己的SPI机制的? JDK源码(一)](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/JDK/1%20Java%E6%98%AF%E5%A6%82%E4%BD%95%E5%AE%9E%E7%8E%B0%E8%87%AA%E5%B7%B1%E7%9A%84SPI%E6%9C%BA%E5%88%B6%E7%9A%84%EF%BC%9F%20JDK%E6%BA%90%E7%A0%81%EF%BC%88%E4%B8%80%EF%BC%89.md) 2. 持续更新中... * JDK源码分析专题:https://github.com/yuanmabiji/Java-SourceCode-Blogs/tree/master/JDK -* JDK源码解析项目(带中文注释):待提供 +* JDK源码解析项目(带中文注释):https://github.com/yuanmabiji/jdk1.8-sourcecode-blogs + ================**TODO LIST**==================== @@ -50,21 +71,5 @@ * Seata * JUC * Zookeeper -* ...... - --------------------------------------------- -微信搜: - -**公众号:源码笔记** - -![img](https://common-ymbj.oss-cn-beijing.aliyuncs.com/%E6%BA%90%E7%A0%81%E7%AC%94%E8%AE%B0%E5%85%AC%E4%BC%97%E5%8F%B7%E4%BA%8C%E7%BB%B4%E7%A0%81.PNG) - -**联系我:hardwork-persistence** - -###### - - - - +* ..... -![img](https://common-ymbj.oss-cn-beijing.aliyuncs.com/%E7%88%B1%E7%BC%96%E7%A0%81%E7%9A%84%E7%A0%81%E5%86%9C%E4%BA%8C%E7%BB%B4%E7%A0%81.PNG) diff --git "a/SpringBoot/10 SpringBoot345円206円205円347円275円256円347円224円237円345円221円275円345円221円250円346円234円237円344円272円213円344円273円266円350円257円246円350円247円243円 SpringBoot346円272円220円347円240円201円(345円215円201円).md" "b/SpringBoot/10 SpringBoot345円206円205円347円275円256円347円224円237円345円221円275円345円221円250円346円234円237円344円272円213円344円273円266円350円257円246円350円247円243円 SpringBoot346円272円220円347円240円201円(345円215円201円).md" new file mode 100644 index 0000000..6d2aec5 --- /dev/null +++ "b/SpringBoot/10 SpringBoot345円206円205円347円275円256円347円224円237円345円221円275円345円221円250円346円234円237円344円272円213円344円273円266円350円257円246円350円247円243円 SpringBoot346円272円220円347円240円201円(345円215円201円).md" @@ -0,0 +1,351 @@ +**SpringBoot中文注释项目Github地址:** + +https://github.com/yuanmabiji/spring-boot-2.1.0.RELEASE + + + +本篇接 [SpringBoot事件监听机制源码分析(上) SpringBoot源码(九)](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/SpringBoot/9%20SpringBoot%E4%BA%8B%E4%BB%B6%E7%9B%91%E5%90%AC%E6%9C%BA%E5%88%B6%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90(%E4%B8%8A)%20SpringBoot%E6%BA%90%E7%A0%81(%E4%B9%9D).md) + +# 1 温故而知新 +温故而知新,我们来简单回顾一下上篇的内容,上一篇我们分析了**SpringBoot启动时广播生命周期事件的原理**,现将关键步骤再浓缩总结下: + +1. 为广播SpringBoot内置生命周期事件做前期准备:1)首先加载`ApplicationListener`监听器实现类;2)其次加载SPI扩展类`EventPublishingRunListener`。 +2. SpringBoot启动时利用`EventPublishingRunListener`广播生命周期事件,然后`ApplicationListener`监听器实现类监听相应的生命周期事件执行一些初始化逻辑的工作。 +# 2 引言 +上篇文章的侧重点是分析了SpringBoot启动时广播生命周期事件的原理,此篇文章我们再来详细分析SpringBoot内置的7种生命周期事件的源码。 +# 3 SpringBoot生命周期事件源码分析 +分析SpringBoot的生命周期事件,我们先来看一张类结构图: +![](https://user-gold-cdn.xitu.io/2020/5/2/171d3520a8eec9ee?w=1172&h=626&f=png&s=56346) +由上图可以看到事件类之间的关系: +1. 最顶级的父类是JDK的事件基类`EventObject`; +2. 然后Spring的事件基类`ApplicationEvent`继承了JDK的事件基类`EventObject`; +3. 其次SpringBoot的生命周期事件基类`SpringApplicationEvent`继承了Spring的事件基类`ApplicationEvent`; +4. 最后SpringBoot具体的7个生命周期事件类再继承了SpringBoot的生命周期事件基类`SpringApplicationEvent`。 + +# 3.1 JDK的事件基类EventObject +`EventObject`类是JDK的事件基类,可以说是所有Java事件类的基本,即所有的Java事件类都直接或间接继承于该类,源码如下: +```java +// EventObject.java + +public class EventObject implements java.io.Serializable { + + private static final long serialVersionUID = 5516075349620653480L; + + /** + * The object on which the Event initially occurred. + */ + protected transient Object source; + /** + * Constructs a prototypical Event. + * + * @param source The object on which the Event initially occurred. + * @exception IllegalArgumentException if source is null. + */ + public EventObject(Object source) { + if (source == null) + throw new IllegalArgumentException("null source"); + this.source = source; + } + /** + * The object on which the Event initially occurred. + * + * @return The object on which the Event initially occurred. + */ + public Object getSource() { + return source; + } + /** + * Returns a String representation of this EventObject. + * + * @return A a String representation of this EventObject. + */ + public String toString() { + return getClass().getName() + "[source=" + source + "]"; + } +} +``` +可以看到`EventObject`类只有一个属性`source`,这个属性是用来记录最初事件是发生在哪个类,举个栗子,比如在SpringBoot启动过程中会发射`ApplicationStartingEvent`事件,而这个事件最初是在`SpringApplication`类中发射的,因此`source`就是`SpringApplication`对象。 +# 3.2 Spring的事件基类ApplicationEvent +`ApplicationEvent`继承了DK的事件基类`EventObject`类,是Spring的事件基类,被所有Spring的具体事件类继承,源码如下: +```java +// ApplicationEvent.java + +/** + * Class to be extended by all application events. Abstract as it + * doesn't make sense for generic events to be published directly. + * + * @author Rod Johnson + * @author Juergen Hoeller + */ +public abstract class ApplicationEvent extends EventObject { + /** use serialVersionUID from Spring 1.2 for interoperability. */ + private static final long serialVersionUID = 7099057708183571937L; + /** System time when the event happened. */ + private final long timestamp; + /** + * Create a new ApplicationEvent. + * @param source the object on which the event initially occurred (never {@code null}) + */ + public ApplicationEvent(Object source) { + super(source); + this.timestamp = System.currentTimeMillis(); + } + /** + * Return the system time in milliseconds when the event happened. + */ + public final long getTimestamp() { + return this.timestamp; + } +} +``` +可以看到`ApplicationEvent`有且仅有一个属性`timestamp`,该属性是用来记录事件发生的时间。 +# 3.3 SpringBoot的事件基类SpringApplicationEvent +`SpringApplicationEvent`类继承了Spring的事件基类`ApplicationEvent`,是所有SpringBoot内置生命周期事件的父类,源码如下: +```java + +/** + * Base class for {@link ApplicationEvent} related to a {@link SpringApplication}. + * + * @author Phillip Webb + */ +@SuppressWarnings("serial") +public abstract class SpringApplicationEvent extends ApplicationEvent { + private final String[] args; + public SpringApplicationEvent(SpringApplication application, String[] args) { + super(application); + this.args = args; + } + public SpringApplication getSpringApplication() { + return (SpringApplication) getSource(); + } + public final String[] getArgs() { + return this.args; + } +} +``` +可以看到`SpringApplicationEvent`有且仅有一个属性`args`,该属性就是SpringBoot启动时的命令行参数即标注`@SpringBootApplication`启动类中`main`函数的参数。 +# 3.4 SpringBoot具体的生命周期事件类 +接下来我们再来看一下`SpringBoot`内置生命周期事件即`SpringApplicationEvent`的具体子类们。 +# 3.4.1 ApplicationStartingEvent + +```java +// ApplicationStartingEvent.java + +public class ApplicationStartingEvent extends SpringApplicationEvent { + public ApplicationStartingEvent(SpringApplication application, String[] args) { + super(application, args); + } +} +``` +SpringBoot开始启动时便会发布`ApplicationStartingEvent`事件,其发布时机在环境变量Environment或容器ApplicationContext创建前但在注册`ApplicationListener`具体监听器之后,标志标志`SpringApplication`开始启动。 +# 3.4.2 ApplicationEnvironmentPreparedEvent +```java +// ApplicationEnvironmentPreparedEvent.java + +public class ApplicationEnvironmentPreparedEvent extends SpringApplicationEvent { + private final ConfigurableEnvironment environment; + /** + * Create a new {@link ApplicationEnvironmentPreparedEvent} instance. + * @param application the current application + * @param args the arguments the application is running with + * @param environment the environment that was just created + */ + public ApplicationEnvironmentPreparedEvent(SpringApplication application, + String[] args, ConfigurableEnvironment environment) { + super(application, args); + this.environment = environment; + } + /** + * Return the environment. + * @return the environment + */ + public ConfigurableEnvironment getEnvironment() { + return this.environment; + } +} +``` +可以看到`ApplicationEnvironmentPreparedEvent`事件多了一个`environment`属性,我们不妨想一下,多了`environment`属性的作用是啥? +答案就是`ApplicationEnvironmentPreparedEvent`事件的`environment`属性作用是利用事件发布订阅机制,相应监听器们可以从`ApplicationEnvironmentPreparedEvent`事件中取出`environment`变量,然后我们可以为`environment`属性增加属性值或读出`environment`变量中的值。 +> **举个栗子:** `ConfigFileApplicationListener`监听器就是监听了`ApplicationEnvironmentPreparedEvent`事件,然后取出`ApplicationEnvironmentPreparedEvent`事件的`environment`属性,然后再为`environment`属性增加`application.properties`配置文件中的环境变量值。 + +当SpringApplication已经开始启动且环境变量`Environment`已经创建后,并且为环境变量`Environment`配置了命令行和`Servlet`等类型的环境变量后,此时会发布`ApplicationEnvironmentPreparedEvent`事件。 + +监听`ApplicationEnvironmentPreparedEvent`事件的第一个监听器是`ConfigFileApplicationListener`,因为是`ConfigFileApplicationListener`监听器还要为环境变量`Environment`增加`application.properties`配置文件中的环境变量;此后还有一些也是监听`ApplicationEnvironmentPreparedEvent`事件的其他监听器监听到此事件时,此时可以说环境变量`Environment`几乎已经完全准备好了。 +> **思考:** 监听同一事件的监听器们执行监听逻辑时是有顺序的,我们可以想一下这个排序逻辑是什么时候排序的?还有为什么要这样排序呢? +# 3.4.3 ApplicationContextInitializedEvent +```java +// ApplicationContextInitializedEvent.java + +public class ApplicationContextInitializedEvent extends SpringApplicationEvent { + private final ConfigurableApplicationContext context; + /** + * Create a new {@link ApplicationContextInitializedEvent} instance. + * @param application the current application + * @param args the arguments the application is running with + * @param context the context that has been initialized + */ + public ApplicationContextInitializedEvent(SpringApplication application, + String[] args, ConfigurableApplicationContext context) { + super(application, args); + this.context = context; + } + /** + * Return the application context. + * @return the context + */ + public ConfigurableApplicationContext getApplicationContext() { + return this.context; + } +} +``` +可以看到`ApplicationContextInitializedEvent`事件多了个`ConfigurableApplicationContext`类型的`context`属性,`context`属性的作用同样是为了相应监听器可以拿到这个`context`属性执行一些逻辑,具体作用将在`3.4.4`详述。 + +`ApplicationContextInitializedEvent`事件在`ApplicationContext`容器创建后,且为`ApplicationContext`容器设置了`environment`变量和执行了`ApplicationContextInitializers`的初始化方法后但在bean定义加载前触发,标志ApplicationContext已经初始化完毕。 + +> **扩展:** 可以看到`ApplicationContextInitializedEvent`是在为`context`容器配置`environment`变量后触发,此时`ApplicationContextInitializedEvent`等事件只要有`context`容器的话,那么其他需要`environment`环境变量的监听器只需要从`context`中取出`environment`变量即可,从而`ApplicationContextInitializedEvent`等事件没必要再配置`environment`属性。 + +# 3.4.4 ApplicationPreparedEvent + +```java +// ApplicationPreparedEvent.java + +public class ApplicationPreparedEvent extends SpringApplicationEvent { + private final ConfigurableApplicationContext context; + /** + * Create a new {@link ApplicationPreparedEvent} instance. + * @param application the current application + * @param args the arguments the application is running with + * @param context the ApplicationContext about to be refreshed + */ + public ApplicationPreparedEvent(SpringApplication application, String[] args, + ConfigurableApplicationContext context) { + super(application, args); + this.context = context; + } + /** + * Return the application context. + * @return the context + */ + public ConfigurableApplicationContext getApplicationContext() { + return this.context; + } +} +``` +同样可以看到`ApplicationPreparedEvent`事件多了个`ConfigurableApplicationContext`类型的`context`属性,多了`context`属性的作用是能让监听该事件的监听器们能拿到`context`属性,监听器拿到`context`属性一般有如下作用: +1. 从事件中取出`context`属性,然后可以增加一些后置处理器,比如`ConfigFileApplicationListener`监听器监听到`ApplicationPreparedEvent`事件后,然后取出`context`变量,通过`context`变量增加了`PropertySourceOrderingPostProcessor`这个后置处理器; +2. 通过`context`属性取出`beanFactory`容器,然后注册一些`bean`,比如`LoggingApplicationListener`监听器通过`ApplicationPreparedEvent`事件的`context`属性取出`beanFactory`容器,然后注册了`springBootLoggingSystem`这个单例`bean`; +3. 通过`context`属性取出`Environment`环境变量,然后就可以操作环境变量,比如`PropertiesMigrationListener`。 + +`ApplicationPreparedEvent`事件在`ApplicationContext`容器已经完全准备好时但在容器刷新前触发,在这个阶段`bean`定义已经加载完毕还有`environment`已经准备好可以用了。 +# 3.4.5 ApplicationStartedEvent +```java +// ApplicationStartedEvent.java + +public class ApplicationStartedEvent extends SpringApplicationEvent { + private final ConfigurableApplicationContext context; + /** + * Create a new {@link ApplicationStartedEvent} instance. + * @param application the current application + * @param args the arguments the application is running with + * @param context the context that was being created + */ + public ApplicationStartedEvent(SpringApplication application, String[] args, + ConfigurableApplicationContext context) { + super(application, args); + this.context = context; + } + /** + * Return the application context. + * @return the context + */ + public ConfigurableApplicationContext getApplicationContext() { + return this.context; + } +} +``` +`ApplicationStartedEvent`事件将在容器刷新后但`ApplicationRunner`和`CommandLineRunner`的`run`方法执行前触发,标志`Spring`容器已经刷新,此时容器已经准备完毕了。 + +> **扩展:** 这里提到了`ApplicationRunner`和`CommandLineRunner`接口有啥作用呢?我们一般会在`Spring`容器刷新完毕后,此时可能有一些系统参数等静态数据需要加载,此时我们就可以实现了`ApplicationRunner`或`CommandLineRunner`接口来实现静态数据的加载。 +# 3.4.6 ApplicationReadyEvent +```java +// ApplicationReadyEvent.java + +public class ApplicationReadyEvent extends SpringApplicationEvent { + private final ConfigurableApplicationContext context; + /** + * Create a new {@link ApplicationReadyEvent} instance. + * @param application the current application + * @param args the arguments the application is running with + * @param context the context that was being created + */ + public ApplicationReadyEvent(SpringApplication application, String[] args, + ConfigurableApplicationContext context) { + super(application, args); + this.context = context; + } + /** + * Return the application context. + * @return the context + */ + public ConfigurableApplicationContext getApplicationContext() { + return this.context; + } +} +``` +`ApplicationReadyEvent`事件在调用完`ApplicationRunner`和`CommandLineRunner`的`run`方法后触发,此时标志`SpringApplication`已经正在运行。 +# 3.4.7 ApplicationFailedEvent + +```java +// ApplicationFailedEvent.java + +public class ApplicationFailedEvent extends SpringApplicationEvent { + private final ConfigurableApplicationContext context; + private final Throwable exception; + /** + * Create a new {@link ApplicationFailedEvent} instance. + * @param application the current application + * @param args the arguments the application was running with + * @param context the context that was being created (maybe null) + * @param exception the exception that caused the error + */ + public ApplicationFailedEvent(SpringApplication application, String[] args, + ConfigurableApplicationContext context, Throwable exception) { + super(application, args); + this.context = context; + this.exception = exception; + } + /** + * Return the application context. + * @return the context + */ + public ConfigurableApplicationContext getApplicationContext() { + return this.context; + } + /** + * Return the exception that caused the failure. + * @return the exception + */ + public Throwable getException() { + return this.exception; + } +} +``` +可以看到`ApplicationFailedEvent`事件除了多了一个`context`属性外,还多了一个`Throwable`类型的`exception`属性用来记录SpringBoot启动失败时的异常。 + +`ApplicationFailedEvent`事件在SpringBoot启动失败时触发,标志SpringBoot启动失败。 + +# 4 小结 +此篇文章相对简单,对SpringBoot内置的7种生命周期事件进行了详细分析。我们还是引用上篇文章的一张图来回顾一下这些生命周期事件及其用途: + +![](https://user-gold-cdn.xitu.io/2020/5/2/171d300d55cc4470?w=796&h=769&f=png&s=378851) + +# 5 写在最后 + +由于有一些小伙伴们建议之前有些源码分析文章太长,导致耐心不够,看不下去,因此,之后的源码分析文章如果太长的话,笔者将会考虑拆分为几篇文章,这样就比较短小了,比较容易看完,嘿嘿。 + +**【源码笔记】Github地址:** + +https://github.com/yuanmabiji/Java-SourceCode-Blogs + +**Star搞起来,嘿嘿嘿!** + diff --git a/SpringBoot/README.md b/SpringBoot/README.md index aab86a4..3b7766d 100644 --- a/SpringBoot/README.md +++ b/SpringBoot/README.md @@ -15,7 +15,8 @@ 7. [SpringBoot的启动流程是怎样的?SpringBoot源码(七)](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/SpringBoot/7%20SpringBoot%E7%9A%84%E5%90%AF%E5%8A%A8%E6%B5%81%E7%A8%8B%E6%98%AF%E6%80%8E%E6%A0%B7%E7%9A%84%EF%BC%9FSpringBoot%E6%BA%90%E7%A0%81%EF%BC%88%E4%B8%83%EF%BC%89.md) 8. [SpringApplication对象是如何构建的? SpringBoot源码(八)](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/SpringBoot/8%20SpringApplication%E5%AF%B9%E8%B1%A1%E6%98%AF%E5%A6%82%E4%BD%95%E6%9E%84%E5%BB%BA%E7%9A%84%EF%BC%9F%20SpringBoot%E6%BA%90%E7%A0%81%EF%BC%88%E5%85%AB%EF%BC%89.md) 9. [SpringBoot事件监听机制源码分析(上) SpringBoot源码(九)](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/SpringBoot/9%20SpringBoot%E4%BA%8B%E4%BB%B6%E7%9B%91%E5%90%AC%E6%9C%BA%E5%88%B6%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90(%E4%B8%8A)%20SpringBoot%E6%BA%90%E7%A0%81(%E4%B9%9D).md) -10. 持续更新中... +10. [SpringBoot内置生命周期事件详解 SpringBoot源码(十)](https://github.com/yuanmabiji/Java-SourceCode-Blogs/blob/master/SpringBoot/10%20SpringBoot%E5%86%85%E7%BD%AE%E7%94%9F%E5%91%BD%E5%91%A8%E6%9C%9F%E4%BA%8B%E4%BB%B6%E8%AF%A6%E8%A7%A3%20%20SpringBoot%E6%BA%90%E7%A0%81(%E5%8D%81).md) +11. 持续更新中... **阅读本源码专题文章结合SpringBoot项目(带中文注释)调试效果会更佳哦**

AltStyle によって変換されたページ (->オリジナル) /