juc
<h1>JUC知识点总结</h1>
<h2>1 JUC并发包概述</h2>
<p>JDK并发工具类是JDK1.5引入的一大重要的功能,集中在Java.util.concurrent包下。java.util.concurrent包主
要包含了并发集合类以及线程池和信号量三组重要工具类。java.util.concurrent包下还包括了
java.util.concurrent.atomic以及java.util.concurrent.locks两个子包。</p>
<p>它包括并发应用程序的锁、互斥、队列、线程池、轻量级任务、有效的并发集合、原子的算术操作和其它基本构
件,我们一般称这个包为 JUC。</p>
<ul>
<li>API中包之间的组织结构
<pre><code class="language-text">java.util.concurrent
java.util.concurrent.atomic
java.util.concurrent.locks</code></pre></li>
<li>
<p>java.util.concurrent包主要包含了 并发集合类 以及 线程池 和 信号量 三组重要工具类。</p>
<pre><code class="language-text">java.util.concurrent
接口:
BlockingDeque
BlockingQueue
Callable
CompletionService
CompletionStage
ConcurrentMap
ConcurrentNavigableMap
Delayed
Executor
ExecutorService
Future
REjectedExecutionHandler
RunnableFuture
RunnableScheduledFuture
ScheduleExecutorService
ScheduledFuture
ThreadFactory
TransferQueue</code></pre>
<pre><code class="language-text">java.util.concurrent
类:
AbstractExecutorService
ArrayBlockingQueue
CompletableFuture
ConcurrentHashMap
ConcurrentLinkedDeque
ConcurrentLinkedQueue
ConcurrentSkipListMap
ConcurrentSkipListSet
CopyOnWriteArrayList
CopyOnWriterArraySet
CountDownLatch
CountedCompleter
CyclicBarrier
DelayQueue
Exchange
ExecutionException
ExecutorCompletionService
Executors
ForkJoinPool
ForkJoinWorkerThread
FutureTask
LinkedBlockingDeque
LinkedBlockingQueue
LinkedTransferQueue
PriorityBlockingQueue
RecursiveAction
RecursiveTask
ScheduledThreadPoolExecutor
Semaphore
SynchronousQueue
ThreadPoolExecutor </code></pre>
<pre><code class="language-text">java.util.concurrent
异常:
BrokenBarrierException
CancellationException
CompletionException
RejectedExecutionException
TimeoutException</code></pre>
<pre><code class="language-text">java.util.concurrent
枚举:
TimeUnit</code></pre>
</li>
<li>
<p>java.util.concurrent.atomic包下是JDK提供的一组原子操作类</p>
</li>
<li>
<p>java.util.concurrent.locks包下是JDK提供的锁机制</p>
<ul>
<li>学习重点
<ol>
<li>原子操作类Atomic : AtomicInteger</li>
<li>锁机制类 Locks : Lock, Condition, ReadWriteLock</li>
<li>并发集合类Collections : Queue, ConcurrentMap</li>
<li>线程池相关类Execute : Future, Callable, Executor</li>
<li>信号量三组工具类Tools : CountDownLatch, CyclicBarrier, Semaphore</li>
</ol></li>
</ul>
</li>
</ul>
<h2>2 原子操作类 Atomic</h2>
<p>原理:Atomic 类主要利用 CAS (compare and swap) + volatile 和 native 方法来保证原子操作,从而避免 synchronized 的高开销,执行效率大为提升。</p>
<p>原子类的类结构API:</p>
<pre><code class="language-text">java.util.concurrent.atomic
类:
AtomicBoolean
AtomicInteger
AtomicIntegerArray
AtomicIntegerFieldUpdater
AtomicLong
AtomicLongArray
AtomicLongFieldUpdater
AtomicMarkableReference
AtomicReference
AtomicReferenceArray
AtomicReferenceFieldUpdater
AtomicStampedReference
DoubleAccumulator
DoubleAdder
LongAccumulator
LongAdder
Striped64</code></pre>
<p>根据修改的数据类型,可以将JUC包中的原子操作类可以分为4类。</p>
<ul>
<li>基本类型: AtomicInteger, AtomicLong, AtomicBoolean ;</li>
<li>数组类型: AtomicIntegerArray, AtomicLongArray, AtomicReferenceArray ;</li>
<li>引用类型: AtomicReference, AtomicStampedReference, AtomicMarkableReference ;</li>
<li>对象的属性修改类型: AtomicIntegerFieldUpdater, AtomicLongFieldUpdater, AtomicReferenceFieldUpdater 。</li>
</ul>
<p>这些类存在的目的是对相应的数据进行原子操作。所谓原子操作,是指操作过程不会被中断,保证数据操作是以原子方式进行的。</p>
<p>多个线程执行一个操作时,其中任何一个线程要么完全执行完此操作,要么没有执行此操作的任何步骤,那么这个操作就是原子的。</p>
<p>出现原因: synchronized的代价比较高。</p>
<h2>3 锁机制类 Locks</h2>
<p>java中的锁,可以分为同步锁和JUC包中的锁。</p>
<p>同步锁通过synchronized关键字来进行同步,实现对竞争资源的互斥访问的锁。</p>
<p>其原理是,对于每一个对象,有且仅有一个同步锁;不同的线程能共同访问该同步锁。但是,在同一个时间点,该同步锁能且只能被一个线程获取到。这样,获取到同步锁的线程就能进行CPU调度,从而在CPU上执行;而没有获取到同步锁的线程,必须进行等待,直到获取到同步锁之后才能继续运行。</p>
<p>JUC包中锁:</p>
<pre><code class="language-text">java.util.concurrent.locks
接口:
Condition
Lock
ReadWriteLock </code></pre>
<pre><code class="language-text">java.util.concurrent.locks
类:
AbstractOwnableSynchronizer
AbstractQueuedLongSynchronizer
AbstractQueuedSynchronizer
LockSupport
ReentrantLock
ReentrantReadWriteLock
StampedLock</code></pre>
<p>相比同步锁,JUC包中的锁的功能更加强大,它为锁提供了一个框架,该框架允许更灵活地使用锁,只是它的用法更难罢了。</p>
<h3>3.1 Lock 接口</h3>
<p>JUC包中的 Lock 接口支持那些语义不同(重入锁、公平锁、非公平锁等)的锁规则。</p>
<ul>
<li>“公平机制"是指"不同线程获取锁的机制是公平的”,而"非公平机制"则是指"不同线程获取锁的机制是非公平的",这里的公平指的是如果公平,那么等待锁的多个线程将会按照申请锁的时间顺序来依次获得锁。</li>
<li>"可重入的锁"是指同一个锁能够被一个线程多次获取。</li>
</ul>
<h3>3.2 ReadWriteLock 接口</h3>
<p>ReadWriteLock 接口以和 Lock 类似的方式定义了一些读取者可以共享而写入者独占的锁。JUC包只有一个类实现了该接口,即 ReentrantReadWriteLock,因为它适用于大部分的标准用法上下文。但程序员可以创建自己的、适用于非标准要求的实现。</p>
<h3>3.3 Condition 接口</h3>
<p>Condition需要和Lock联合使用,它的作用是代替Object监视器方法,可以通过await(),signal()来休眠/唤醒线程。Condition 接口描述了可能会与锁有关联的条件变量。这些变量在用法上与使用 Object.wait 访问的隐式监视器类似,但提供了更强大的功能。需要特别指出的是,单个 Lock 可能与多个 Condition 对象关联。为了避免兼容性问题,Condition 方法的名称与对应的 Object 版本中的不同。</p>
<h4>3.3.1 AQS抽象类</h4>
<p>AbstractQueuedSynchronizer 就是被称之为AQS的类,它是一个非常有用的超类,可用来定义锁以及依赖于排队阻塞线程的其他同步器;</p>
<p>ReentrantLock,ReentrantReadWriteLock,CountDownLatch,CyclicBarrier和Semaphore等这些类都是基于AQS类实现的。AbstractQueuedLongSynchronizer 类提供相同的功能但扩展了对同步状态的 64 位的支持。两者都扩展了类 AbstractOwnableSynchronizer(一个帮助记录当前保持独占同步的线程的简单类)。</p>
<h4>3.3.2 LockSupport 类</h4>
<p>LockSupport的功能和"Thread中的Thread.suspend()和Thread.resume()有点类似",LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程。但是park()和unpark()不会遇到“Thread.suspend 和Thread.resume所可能引发的死锁”问题。</p>
<h3>3.4 实现类</h3>
<h4>3.4.1 ReentrantLock 类</h4>
<p>ReentrantLock是独占锁。所谓独占锁,是指只能被独自占领,即同一个时间点只能被一个线程锁获取到的锁(互斥同步语义)。ReentrantLock锁包括"公平的ReentrantLock"和"非公平的ReentrantLock"(公平非公平语义)。"公平的ReentrantLock"是指"不同线程获取锁的机制是公平的",而"非公平的ReentrantLock"则是指"不同线程获取锁的机制是非公平的",ReentrantLock是"可重入的锁"(可重入语义)。
<img src="https://s2.ax1x.com/2019/12/17/QIUIrd.jpg" alt="ReetrantLock类图" /></p>
<ul>
<li>ReentrantLock实现了Lock接口。</li>
<li>ReentrantLock中有一个成员变量sync,sync是Sync类型;Sync是一个抽象类,而且它继承于AQS。</li>
<li>ReentrantLock中有"公平锁类"FairSync和"非公平锁类"NonfairSync,它们都是Sync的子类。
ReentrantReadWriteLock中sync对象,是FairSync与NonfairSync中的一种,这也意味着ReentrantLock是"公平锁"或"非公平锁"中的一种,ReentrantLock默认是非公平锁。</li>
</ul>
<h4>3.4.2 ReentrantReadWriteLock类</h4>
<p>ReentrantReadWriteLock是读写锁接口ReadWriteLock的实现类,它包括子类ReadLock和WriteLock。ReadLock是
共享锁,而WriteLock是独占锁。
<img src="https://s2.ax1x.com/2019/12/17/QIUoqA.jpg" alt="ReentrantReadWriteLock" /></p>
<ul>
<li>ReentrantReadWriteLock实现了ReadWriteLock接口。</li>
<li>ReentrantReadWriteLock中包含sync对象,读锁readerLock和写锁writerLock。读锁ReadLock和写锁WriteLock
都实现了Lock接口。</li>
<li>和"ReentrantLock"一样,sync是Sync类型;而且,Sync也是一个继承于AQS的抽象类。Sync也包括"公平锁"
FairSync和"非公平锁"NonfairSync。</li>
</ul>
<h2>4 并发集合类 Collections</h2>
<h3>4.1 java集合架构</h3>
<p>java集合的架构。主体内容包括Collection集合和Map类;而Collection集合又可以划分为List(队列)、Set(集合)
和Queue(队列)。
<img src="https://s2.ax1x.com/2019/12/17/QIUOG8.png" alt="java集合框架图" /></p>
<h4>4.1.1 List 的实现类</h4>
<p>List的实现类主要有: LinkedList, ArrayList, Vector, Stack。</p>
<ul>
<li>LinkedList是双向链表实现的双端队列;它不是线程安全的,只适用于单线程。</li>
<li>ArrayList是数组实现的队列,它是一个动态数组;它也不是线程安全的,只适用于单线程。</li>
<li>Vector是数组实现的矢量队列,它也一个动态数组;不过和ArrayList不同的是,Vector是线程安全的,它支持并发。</li>
<li>Stack是Vector实现的栈;和Vector一样,它也是线程安全的。</li>
</ul>
<h4>4.1.2 Set 的实现类</h4>
<p>Set的实现类主要有: HastSet和TreeSet。</p>
<ul>
<li>HashSet 是一个没有重复元素的集合,它通过HashMap实现的;HashSet不是线程安全的,只适用于单线程。</li>
<li>TreeSet也是一个没有重复元素的集合,不过和HashSet不同的是,TreeSet中的元素是有序的;它是通过
TreeMap实现的;TreeSet也不是线程安全的,只适用于单线程。</li>
</ul>
<h4>4.1.3 Map 的实现类</h4>
<p>Map的实现类主要有: HashMap,WeakHashMap, Hashtable和TreeMap。</p>
<ul>
<li>HashMap是存储“键-值对”的哈希表;它不是线程安全的,只适用于单线程。</li>
<li>WeakHashMap是也是哈希表;和HashMap不同的是,HashMap的“键”是强引用类型,而WeakHashMap的“键”是
弱引用类型,也就是说当WeakHashMap 中的某个键不再正常使用时,会被从WeakHashMap中被自动移除,
WeakHashMap也不是线程安全的,只适用于单线程。</li>
<li>Hashtable也是哈希表;和HashMap不同的是,Hashtable是线程安全的,支持并发。</li>
<li>TreeMap也是哈希表,不过TreeMap中的“键-值对”是有序的,它是通过R-B Tree(红黑树)实现的;TreeMap不
是线程安全的,只适用于单线程。</li>
</ul>
<h4>4.1.4 如何保证线程安全</h4>
<p>综合以上考虑,线程安全的实现类有vector,stack,hashtable 为了方便,我们将前面介绍集合类统称为
"java集合包"。java集合包大多是“非线程安全的”,虽然可以通过Collections工具类中的方法获取java集合包
对应的同步类,但是这些同步类的并发效率并不是很高。为了更好的支持高并发任务,并发大师Doug Lea在JUC
(java.util.concurrent)包中添加了java集合包中单线程类的对应的支持高并发的类。</p>
<h3>4.2 JUC 中的集合类</h3>
<p>为了方便讲诉,我将JUC包中的集合类划分为3部分来进行说明。</p>
<h4>4.2.1 JUC包中List和Set实现类</h4>
<p>JUC集合包中的List和Set实现类包括: CopyOnWriteArrayList, CopyOnWriteArraySet和ConcurrentSkipListSet。
ConcurrentSkipListSet稍后在说明Map时再说明,CopyOnWriteArrayList 和 CopyOnWriteArraySet的框架如图所示:
<img src="https://s2.ax1x.com/2019/12/17/QIHKit.jpg" alt="List and Set" /></p>
<ul>
<li>CopyOnWriteArrayList相当于线程安全的ArrayList,它实现了List接口。CopyOnWriteArrayList是支持高并发的。</li>
<li>CopyOnWriteArraySet相当于线程安全的HashSet,它继承于AbstractSet类。CopyOnWriteArraySet 内部包含一
个CopyOnWriteArrayList对象(聚合关系),它是通过CopyOnWriteArrayList实现的。</li>
</ul>
<h4>4.2.2 JUC包中的Map实现类</h4>
<p>JUC集合包中Map的实现类包括: ConcurrentHashMap和ConcurrentSkipListMap。它们的框架如下图所示:
<img src="https://s2.ax1x.com/2019/12/17/QIHMJP.jpg" alt="Map" /></p>
<ul>
<li>ConcurrentHashMap是线程安全的哈希表(相当于线程安全的HashMap);它继承于AbstractMap类,并且实现
ConcurrentMap接口。ConcurrentHashMap是通过“分段锁”来实现的,它支持并发。</li>
<li>ConcurrentSkipListMap是线程安全的有序的哈希表(相当于线程安全的TreeMap); 它继承于AbstractMap类,并
且实现ConcurrentNavigableMap接口。ConcurrentSkipListMap是通过“跳表”来实现的,它支持并发。</li>
<li>ConcurrentSkipListSet是线程安全的有序的集合(相当于线程安全的TreeSet);它继承于AbstractSet,并实现
了NavigableSet接口。ConcurrentSkipListSet是通过ConcurrentSkipListMap实现的,它也支持并发。</li>
</ul>
<h4>4.2.3 JUC包中的Queue实现类</h4>
<p>JUC集合包中Queue的实现类包括: ArrayBlockingQueue, LinkedBlockingQueue, LinkedBlockingDeque,
ConcurrentLinkedQueue和ConcurrentLinkedDeque。它们的框架如下图所示:
<img src="https://s2.ax1x.com/2019/12/17/QIHlz8.jpg" alt="Queue" /></p>
<ul>
<li>ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列。</li>
<li>LinkedBlockingQueue是单向链表实现的(指定大小)阻塞队列,该队列按 FIFO(先进先出)排序元素。</li>
<li>LinkedBlockingDeque是双向链表实现的(指定大小)双向并发阻塞队列,该阻塞队列同时支持FIFO和FILO两种操
作方式。</li>
<li>ConcurrentLinkedQueue是单向链表实现的无界队列,该队列按 FIFO(先进先出)排序元素。</li>
<li>ConcurrentLinkedDeque是双向链表实现的无界队列,该队列同时支持FIFO和FILO两种操作方式。</li>
</ul>
<h2>5 线程池相关类 Execute</h2>
<h3>5.1 ThreadPoolExecutor类</h3>
<p><img src="https://s2.ax1x.com/2019/12/17/QIULPf.jpg" alt="ThreadPoolExecutor" /></p>
<p>java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程
池,必须先了解这个类。其源码如下:</p>
<pre><code class="language-java">public class ThreadPoolExecutor extends AbstractExecutorService {
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
}</code></pre>
<p>ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,前面三个构造器都是调用的第四
个构造器进行的初始化工作。各个参数的含义如下:</p>
<ul>
<li>
<p>int corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,
默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了
prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意
思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中
的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就
会把到达的任务放到缓存队列当中;</p>
</li>
<li>
<p>int maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多
少个线程;</p>
</li>
<li>
<p>long keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大
于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,<strong>即当线程池中的
线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不
超过corePoolSize。</strong>但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于
corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;</p>
</li>
<li>TimeUnit unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
<pre><code>TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒</code></pre></li>
<li>
<p>BlockingQueue<Runnable> workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,
会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:</p>
<pre><code>ArrayBlockingQueue;
PriorityBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;</code></pre>
<p>ArrayBlockingQueue 和 PriorityBlockingQueue 使用较少,一般使用 LinkedBlockingQueue 和
SynchronousQueue。线程池的排队策略与BlockingQueue有关。</p>
</li>
<li>ThreadFactory threadFactory:线程工厂,主要用来创建线程;</li>
<li>
<p>RejectedExecutionHandler handler:表示当拒绝处理任务时的策略,有以下四种取值:</p>
<pre><code>ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 </code></pre>
<p>ThreadPoolExecutor继承了AbstractExecutorService,我们来看一下AbstractExecutorService的实现:</p>
<pre><code class="language-java">public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public Future<?> submit(Runnable task) {};
public <T> Future<T> submit(Runnable task, T result) { };
public <T> Future<T> submit(Callable<T> task) { };
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
};
}</code></pre>
<p>AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。接着看ExecutorService接口的实现:</p>
<pre><code class="language-java">public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}</code></pre>
<p>而ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现:</p>
<pre><code class="language-java">public interface Executor {
void execute(Runnable command);
}</code></pre>
<p>到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间
的关系了。</p>
</li>
<li>Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类
型,从字面意思可以理解,就是用来执行传进去的任务的;</li>
<li>然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及
shutDown等;</li>
<li>抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;</li>
<li>然后ThreadPoolExecutor继承了类AbstractExecutorService。</li>
</ul>
<p>在ThreadPoolExecutor类中有几个非常重要的方法:</p>
<pre><code class="language-text">execute()
submit()
shutdown()
shutdownNow()</code></pre>
<ul>
<li>execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是
ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。</li>
<li>submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在
ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不
同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过
它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。</li>
<li>shutdown()和shutdownNow()是用来关闭线程池的。</li>
<li>还有很多其他的方法:比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()
等获取与线程池相关属性的方法。</li>
</ul>
<h3>5.2 深入剖析线程池实现原理</h3>
<h4>5.2.1 线程池状态</h4>
<p>在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:</p>
<pre><code class="language-text">volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;</code></pre>
<p>runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;下面的几个static final
变量表示runState可能的几个取值。</p>
<ul>
<li>当创建线程池后,初始时,线程池处于RUNNING状态;</li>
<li>如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务
执行完毕;</li>
<li>如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正
在执行的任务;</li>
<li>当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程
池被设置为TERMINATED状态。</li>
</ul>
<h4>5.2.2 任务执行</h4>
<p>在了解将任务提交给线程池到任务执行完毕整个过程之前,先来看一下ThreadPoolExecutor类中其它一些比较重要
成员变量:</p>
<pre><code class="language-text">private final BlockingQueue<Runnable> workQueue; //任务缓存队列,用来存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock(); //线程池的主要状态锁,对线程池状态(比如线程池大小
private final HashSet<Worker> workers = new HashSet<Worker>(); //用来存放工作集
private volatile long keepAliveTime; //线程存货时间
private volatile boolean allowCoreThreadTimeOut; //是否允许为核心线程设置存活时间
private volatile int corePoolSize; //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int maximumPoolSize; //线程池最大能容忍的线程数
private volatile int poolSize; //线程池中当前的线程数
private volatile RejectedExecutionHandler handler; //任务拒绝策略
private volatile ThreadFactory threadFactory; //线程工厂,用来创建线程
private int largestPoolSize; //用来记录线程池中曾经出现过的最大线程数
private long completedTaskCount; //用来记录已经执行完毕的任务个数</code></pre>
<p>进入正题,任务从提交到最终执行完毕经历了哪些过程。在ThreadPoolExecutor类中,最核心的任务提交方法是
execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,
所以我们只需要研究execute()方法的实现原理即可:</p>
<pre><code>//核心方法execute()
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}</code></pre>
<p><img src="https://s2.ax1x.com/2019/12/17/QIHnII.jpg" alt="线程池处理流程" /></p>
<ol>
<li>首先,判断提交的任务command是否为null,若是null,则抛出空指针异常;</li>
<li>接着,<code>if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))</code>,如果线程池中当前线程数不
小于核心池大小,那么就会直接进入下面的if语句块了。如果线程池中当前线程数小于核心池大小,则接着执行
后半部分,也就是执行<code>!addIfUnderCorePoolSize(command)</code>。</li>
<li>如果执行完addIfUnderCorePoolSize这个方法返回false,则继续执行下面的if语句块,否则整个方法就直接执行
完毕了。</li>
<li>然后,执行<code>if (runState == RUNNING && workQueue.offer(command))</code>如果当前线程池不处于RUNNING状态或者
任务放入缓存队列失败,则执行:<code>addIfUnderMaximumPoolSize(command)</code>。</li>
<li>如果执行addIfUnderMaximumPoolSize方法失败,则执行reject()方法进行任务拒绝处理。</li>
<li>第4步,如果语句<code>if (runState == RUNNING && workQueue.offer(command))</code>返回true,则继续进行判断:
<code>if (runState != RUNNING || poolSize == 0)</code>。这句判断是为了防止在将此任务添加进任务缓存队列的同时其
他线程突然调用shutdown或者shutdownNow方法关闭了线程池的一种应急措施。如果是这样就执行:
<code>ensureQueuedTaskHandled(command)</code>,从名字可以看出是保证 添加到任务缓存队列中的任务得到处理。</li>
</ol>
<p>接着看2个关键方法的实现:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:</p>
<ul>
<li>addIfUnderCorePoolSize 方法的具体实现
<pre><code class="language-text">private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask); //创建线程去执行firstTask任务
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}</code></pre></li>
</ul>
<pre><code class="language-text">private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w); //创建一个线程,执行任务
if (t != null) {
w.thread = t; //将创建的线程的引用赋值为w的成员变量
workers.add(w);
int nt = ++poolSize; //当前线程数加1
if (nt > largestPoolSize)
largestPoolSize = nt;
}
return t;
}</code></pre>
<ul>
<li>addIfUnderMaximumPoolSize 方法的具体实现
<pre><code class="language-text">private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
if (t == null)
return false;
t.start();
return true;
}</code></pre></li>
</ul>
<h4>5.2.3 总结</h4>
<h5>5.2.3.1 线程池执行流程</h5>
<p>1)首先,要清楚corePoolSize和maximumPoolSize的含义;</p>
<p>2)其次,要知道Worker是用来起到什么作用的(Workers是一个工作集,要执行的任务Worker放在这个工作集中);</p>
<p>3)要知道任务提交给线程池之后的处理策略,这里总结一下主要有4点:</p>
<ul>
<li>如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;</li>
<li>如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添
加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创
建新的线程去执行这个任务;</li>
<li>如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;</li>
<li>如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直
至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空
闲时间超过keepAliveTime,线程也会被终止。</li>
</ul>
<h5>5.2.3.2 线程池初始化</h5>
<p>默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。在实际中如果需要线
程池创建之后立即创建线程,可以通过以下两个方法办到:</p>
<ul>
<li>prestartCoreThread():初始化一个核心线程;</li>
<li>prestartAllCoreThreads():初始化所有核心线程
<pre><code class="language-text">
public boolean prestartCoreThread() {
return addIfUnderCorePoolSize(null); //注意传进去的参数是null
}</code></pre></li>
</ul>
<p>public int prestartAllCoreThreads() {
int n = 0;
while (addIfUnderCorePoolSize(null))//注意传进去的参数是null
++n;
return n;
}</p>
<pre><code>注意上面传进去的参数是null,如果传进去的参数为null,则最后执行线程会阻塞在getTask方法中的
r = workQueue.take(); 即等待任务队列中有任务。
#####5.2.3.3 任务缓存及排队策略
任务缓存队列,即workQueue,它用来存放等待执行的任务。workQueue的类型为BlockingQueue<Runnable>,通常
可以取下面三种类型:
* ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
* LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
* synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
#####5.2.3.4 任务拒绝策略
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒
绝策略,通常有以下四种策略:
```text
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务</code></pre>
<h5>5.2.3.5 线程池的关闭</h5>
<p>ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:</p>
<ul>
<li>shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受
新的任务</li>
<li>shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。</li>
</ul>
<h5>5.2.3.6 线程池容量的动态调整</h5>
<p>ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize()</p>
<ul>
<li>setCorePoolSize:设置核心池大小</li>
<li>setMaximumPoolSize:设置线程池最大能创建的线程数目大小</li>
</ul>
<h2>6 工具类 Tools</h2>
<h3>6.1 CountDownLatch 类</h3>
<p>CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待
。CountDownLatch的UML类图如下:</p>
<p><img src="https://s2.ax1x.com/2019/12/17/QIU7VI.jpg" alt="CountDownLatch" /></p>
<h3>6.2 CyclicBarrier 类</h3>
<p>CyclicBarrier[ˈsaɪklɪk][ˈbæriə(r)]是一个同步辅助类,允许一组线程互相等待,直到到达某个公共屏障点
(common barrier point)。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
<img src="https://s2.ax1x.com/2019/12/17/QIU7VI.jpg" alt="CyclicBarrier" /></p>
<p>CyclicBarrier是包含了“ReentrantLock对象lock"和"Condition对象trip”,它是通过独占锁实现的。</p>
<p>CyclicBarrier和CountDownLatch的区别是:</p>
<ul>
<li>CountDownLatch的作用是允许1或N个线程等待其他线程完成执行;而CyclicBarrier则是允许N个线程相互等待。</li>
<li>CountDownLatch的计数器无法被重置;CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier。</li>
</ul>
<h3>6.3 Semaphore 类</h3>
<p>Semaphore[ˈseməfɔ:®]是一个计数信号量,它的本质是一个“共享锁”。信号量维护了一个信号量许可集。
线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程
必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可。
<img src="https://s2.ax1x.com/2019/12/17/QIU5KH.jpg" alt="semaphore" /></p>
<p>和"ReentrantLock"一样,Semaphore包含了sync对象,sync是Sync类型;而且,Sync也是一个继承于AQS的抽象类。</p>
<blockquote>
<p>Sync也包括"公平信号量"FairSync和"非公平信号量"NonfairSync。</p>
</blockquote>