多线程
线程的状态。
一、线程池
- 提交任务时 4 种情况:
- 小于 corePoolSize addWorker()。
- 大于 corePoolSize workQueue.offer(command) 直接增加 task 如果增加失败就拒绝。
- 拒绝策略
- AbortPolicy 抛出异常,默认。
- CallerRunsPolicy 不使用线程池执行。
- DiscardPolicy 直接丢弃。
- DiscardOldestPolicy 丢弃队列中最旧的任务。
二、锁
Sychronized 原理
用法:
- 方法
- 代码块
在 JDK 1.6 之前,synchronized 只有传统的锁机制,因此给开发者留下了 synchronized 关键字相比于其他同步机制性能不好的印象。在 JDK 1.6 引入了两种新型锁机制:偏向锁和轻量级锁,它们的引入是为了解决在没有多线程竞争或基本没有竞争的场景下因使用传统锁机制带来的性能开销问题。
锁的升级: 偏向锁->轻量级锁->重量锁
锁的映射关系存在对象头中的。32 位系统上各状态如图所示:
偏向锁:
当 JVM 启用了偏向锁,那么新创建的对象都是可偏向状态,此时 mark word 里的 thread id 为 0,表示未偏向任何线程
加锁过程:
- 当对象第一次被线程获取锁时,发现是未偏向的,那就将 thread id 改为当前线程 id,成功继续执行同步块中的代码,失败则升级为轻量级锁
- 当被偏向的线程再次进入同步块时,发现锁偏向的就是当前线程,通过一些额外检查后就继续执行。
- 当其他线程进入同步块,发现有偏向的线程了,会进入撤销偏向锁逻辑。
解锁过程:
- 栈中的最近一条 lock record 的 obj 字段设置为 null
轻量级锁:
线程在执行同步块之前,JVM 会在线程的栈帧上建立一个 Lock Record。其包括了一个存储对象头中的 mark word 的 Displaced Mark Word 以及一个对象头指针。
加锁过程:
- 在线程栈中创建一个 Lock Record,将其 obj refercence 字段指向锁对象。
- 通过 CAS 指令将 Lock Record 地址放在对象头的 mark word 中,如果对象是无锁状态则修改成功,代表获取到了轻量级锁。如果失败进入步骤 3
- 如果线程以及持有该锁了,代表这是锁重入,设置 Lock Record 第一部分(Displaced Mark Word)为 null,起到了一个重入计数器的作用。然后结束
- 走到这一步说明发生了竞争,膨胀为重量锁。
解锁过程:
- 遍历线程栈,找到所有 obj 字段等于当前锁对象的 Lock Record
- 如果 Lock Record 的 Displaced Mark Word 为 null,代表是一次重入,将 obj 设为 null 后 continue
- 如果 Lock Record 的 Displaced Mark Word 不为 null,则利用 CAS 指令将对象头的 mark word 恢复成为 Displaced Mark Word。如果成功,则 continue,否则膨胀为重量级锁
重量级锁:
利用的是 JVM 的监视器(Monitor)
java 会为每个 object 对象分配一个 monitor,当某个对象的同步方法(synchronized methods )被多个线程调用时,该对象的 monitor 将负责处理这些访问的并发独占要求。
- 当 Sychronized 修饰在代码块上的时候,使用的是 monitorenter 指令和 monitorexit 指令。
monitorenter
过程如下:
- 如果 Monitor 的进入数为 0,则该线程进入 Monitor,然后进入数+1,然后该线程即为 Monitor 的所有者
- 如果线程已经占有了 Monitor 只是重新进入,则进入数+1
- 如果其他线程占有了,则线程阻塞,直到 Monitor 的进入数为 0,在尝试获取
monitorexit
过程如下:
- 指令执行时,Monitor 的进入数减一,如果进入数为 0,则线程退出 Monitor
- 其他被阻塞的线程可以尝试获取这个 Monitor 的所有权
- Synchronize 作用在方式里时,会加上一个 ACC_SYNCHRONIZED 标识。当有这个标识后,线程执行将先获取 Monitor,获取成功才能执行方法体。
三、AQS
// acquire方法获取资源占有权
public final void acquire(int arg) {
/** 尝试获取,tryAcquire方法是子类必须实现的方法,
* 比如公平锁和非公平锁的不同就在于tryAcquire方法的实现的不同。
* 获取失败,则addWaiter方法,包装node节点,放入node双向链表。再acquireQueued堵塞线程,循环获取资源占有权。
*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
private Node addWaiter(Node mode) {
//新构建的node节点,waitStatus初始值为0
Node node = new Node(Thread.currentThread(), mode);
//Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果尾部不为空,则说明node双向链表之前已经被初始化了,那么直接把新node节点加入尾部
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果尾部为null,则说明node双向链表之前没有被初始化,则,调用enq方法,初始化node双向链表,并且把新节点加入尾部
enq(node);
return node;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
acquire 方法总结:
如果获取成功:则 state 加 1,并调用 AQS 的父类
AbstractOwnableSynchronizer 的设置独占线程,把当前独占线程设置当前线程。
如果调用失败:则说明,前面已经有线程占用了这个资源,需要等待的线程释放。则把当前线程封装成 node 节点,放入 node 双向链表,之后 Locksupport.pack()堵塞当前线程。假如这个线程堵塞后被唤醒,则继续循环调用 tryAcquire 方法获取资源许可,获取到了,则把自身 node 节点设置为 node 链表的头节点,把之前的头节点去掉。
node 节点的 waitStatus 为 signal,则意味这其 next 节点可以被唤醒。
release 方法总结:
如果线程释放资源,调用 release 方法,release 方法会调用 tryRelease 方法尝试释放资源,如果释放成功,tryRelease 方法会将 state 减 1,再调用 AQS 的父类
AbstractOwnableSynchronizer 的设置独占线程为 null,再 locksupport.unpack()双向 node 链表的头 node 节点的线程,恢复其执行。
四、实战
顺序打印 ABC。
/**
* @description:
* @author: mmc
* @create: 2020-01-03 09:42
**/
public class ThreadABC {
private static Object A = new Object();
private static Object B = new Object();
private static Object C = new Object();
private static class ThreadPrint extends Thread{
private String name;
private Object prev;
private Object self;
public ThreadPrint(String name,Object prev,Object self){
this.name=name;
this.prev=prev;
this.self=self;
}
public void run() {
for (int i = 0; i < 10; i++) {
synchronized (prev) {
synchronized (self) {
System.out.println(name);
self.notifyAll();
}
try {
if(i>=9){
prev.notifyAll();
}else {
prev.wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
public static void main(String[] args) throws InterruptedException {
ThreadPrint threadA = new ThreadPrint("A",C,A);
ThreadPrint threadB = new ThreadPrint("B",A,B);
ThreadPrint threadC = new ThreadPrint("C",B,C);
threadA.start();
Thread.sleep(10);
threadB.start();
Thread.sleep(10);
threadC.start();
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.