JUC并发编程

基础知识

一把锁

synchronized

两个并(并发、并行)

并发:

  • 是在同一实体上的多个事件
  • 是在一合处理器上“同时”处理多个任务
  • 同一时刻,其实是只有一个事件在发生

并行:

  • 是在不同实体上的多个事件
  • 是在多台处理器上同时处理多个任务
  • 同一时刻,大家真的都在做事情

三个程(进程、线程、管程)

进程:简单的说,在系统中运行的一个应用程序就是一个进程,每一个进程都有它自己的内存空间和系统资源。

线程:也被称为轻量级进程,在同一个进程内会有1个或多个线程,是大多数操作系统进行时序调度的基本单元。

管程:Monitor(监视器),也就是我们平时所说的锁,Monitor其实是一种同步机制,他的义务是保证(同一时间)只有一个线程可以访问被保护的数据和代码。JVM中同步是基于进入和退出监视器对象(Monitor,管程对象)来实现的,每个对象实例都会有一个Monitor对象。Monitor对象会和Java对象一同创建并销毁。

同步指令

执行线程就要求先成功持有管程,然后才能执行方法,最后当方法完成 (无论是正當完成还是非正常完成)时释放管程。在方法执行期间,执行线程持有了管程,其他任何线程都无法再获取到同一个管程。

如果一个同步方法执行期间拋出了异常,并且在方法内部无法处理此异常,那这个同步方法所持有的管程将在异常抛到同步方法边界之外时自动释放。同步一段指令集序列通常是由Java语言中的synchronized语句块来表示的,Java虛拟机的指令集中有 monitorenter 和 monitorext 两条指令来支持synchronized关键字的语义,正确实现synchronized关键宇需要Javac编译器与Java虛拟机两者共同协作支持。

用户线程和守护线程

用户线程(User Thread)

是系统的工作线程,它会完成这个程序需要完成的业务操作。

守护线程(Daemon Thread)

是一种特殊的线蛋为基宫线程服务的,在后台默默地完成一些系统性的服务,比如垃圾回收线程就是最典型的例子

守护线程作为一个服务线程,没有服务对象就没有必要继续运行了,如果用户线程全部结束了,意味着程序需要完成的业务操作已经结束了,系统可以退出了。所以假如当系统只剩下守护线程的时候,java虛拟机会自动退出。

总结

  • 如果用户线程全部结束意味着程序需要完成的业务操作已经结束了,守护线程随者JVM一同结束工作
  • setDaemon(true)方法必须在start()之前设置,否则报IllegalThreadStateException异常

CompletableFuture

Future接口理论知识复习

Future接口(FutureTask实现类)定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。

Future是Java5新加的一个接口,它提供了一种异步并行计算的功能

如果主线程需要执行一个很耗时的计算任务,我们就可以通过future把这个任务放到异步线程中执行。

主线程继续处理其他任务或者先行结束,再通过Future 获取计算结果。

Future接口常用实现类FutureTask异步任务

优点

  • future+线程池异步多线程任务配合,能显著提高程序的执行效率。

缺点

  • get()阻塞,一旦调用get()方法求结果,如果计算没有完成容易导致程序阻塞
  • isDone()轮询,轮询的方式会耗费无谓的CPU资源,而且也不见得能及时地得到计算结果。如果想要异步获取结果,通常都会以轮询的方式去获取结果。尽量不要阻寒

总结

Future对于结果的获取不是很友好,只能通过阻塞或轮询的方式得到任务的结果。

CompletableFuture对Future的改进

FutureTask阻塞的方式和异步编程的设计理念相违背,而轮询的方式会耗费无谓的CPU资源。CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方。

CompletionStage

  • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
  • 一个阶段的计算执行可以是一个Function, Consumer或者Runnable。比如:stage.thenApply(x-> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

CompletableFuture

  • 在Java8中,CompletableFuture提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,并且提供了函数式编程的能力,可以通过回调的方式处理计算结果,也提供了转换和组合 CompletableFuture 的方法。
  • 它可能代表一个明确完成的Future,也有可能代表一个完成阶段(CompletionStage),它支持在计算完成以后触发一些函数或执行某些动作。
  • 它实现了Future和CompletionStage接口

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package org.monochrome.cf;

import java.util.concurrent.*;

/**
* @author monochrome
* @date 2022/11/24
*/
public class CompletableFutureUseDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {

ExecutorService threadPool = Executors.newFixedThreadPool(3);
try {

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "---come in");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// 模拟抛异常
// if (true) {
// throw new RuntimeException("get result after 1s wrong!!!");
// }
System.out.println("get result after 1s:" + result);
return result;
}, threadPool).whenComplete((v, e) -> {
if (e == null) {
System.out.println("successful! get result:" + v);
}
}).exceptionally(e -> {
System.out.println(e.getMessage());
return null;
});
} finally {
threadPool.shutdown();
}
System.out.println(Thread.currentThread().getName() + " do something");
// 主线程不要立刻结束,否则completableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
// try {
// TimeUnit.SECONDS.sleep(3);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
}

private static void future() throws InterruptedException, ExecutionException {
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + "---come in");
int result = ThreadLocalRandom.current().nextInt(10);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("get result after 1s:" + result);
return result;
});
System.out.println(Thread.currentThread().getName() + " do something");
System.out.println(completableFuture.get());
}
}

案例精讲-从电商网站的比价需求说开去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package org.monochrome.cf;

import lombok.Getter;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
*
* 案例说明:电商比价需求,模拟如下情况:
*
* 1 需求:
* 1.1 同一款产品,同时搜索出同款产品在各大电商平台的售价;
* 1.2 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少
*
* 2 输出:出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List<String>
* 《mysql》 in jd price is 88.05
* 《mysql》 in dangdang price is 86.11
* 《mysql》 in taobao price is 90.43
*
* 3 技术要求
* 3.1 函数式编程
* 3.2 链式编程
* 3.3 Stream流式计算
*
* @author monochrome
* @date 2022/11/25
*/
public class CompletableFutureMallDemo {

static List<NetMall> netMalls = Arrays.asList(
new NetMall("jd"),
new NetMall("tmall"),
new NetMall("pdd"),
new NetMall("taobao")
);

public static void main(String[] args) {
long startTime = System.currentTimeMillis();

List<String> prices = getPrices(netMalls, "mysql");
prices.forEach(System.out::println);

long endTime = System.currentTimeMillis();
System.out.println("Step by Step ----cost Time: " + (endTime - startTime) +"ms");
long startTime1 = System.currentTimeMillis();

List<String> pricesByCompletableFuture = getPricesByCompletableFuture(netMalls, "mysql");
pricesByCompletableFuture.forEach(System.out::println);

long endTime1 = System.currentTimeMillis();
System.out.println("Completable Future ----cost Time: " + (endTime1 - startTime1) +"ms");


}

/**
* step by step 一家家搜查
*
* @param productName
* @return
*/
public static List<String> getPrices(List<NetMall> netMalls, String productName) {
List<String> prices = netMalls.stream()
.map(netMall -> String.format("《%s》 in %s price is %.2f", productName, netMall.getMallName(), netMall.calcPrice(productName)))
.collect(Collectors.toList());
return prices;

}

public static List<String> getPricesByCompletableFuture(List<NetMall> netMalls, String productName) {
List<String> prices = netMalls.stream()
.map(netMall -> CompletableFuture.supplyAsync(() -> {
return String.format("《%s》 in %s price is %.2f", productName, netMall.getMallName(), netMall.calcPrice(productName));
}))
.collect(Collectors.toList())
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
return prices;
}

}

class NetMall {
@Getter
private String mallName;

public NetMall(String mallName) {
this.mallName = mallName;
}

public double calcPrice(String productName) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
}

}

CompletableFuture API

获得结果和触发计算(get、getNow、join、complete)

  • public T get( ):不见不散(会抛出异常),只要调用了get( )方法,不管是否计算完成都会导致阻塞

  • public T get(long timeout, TimeUnit unit):过时不候

  • public T getNow(T valuelfAbsent):没有计算完成的情况下,给我一个替代结果计算完,返回计算完成后的结果、没算完,返回设定的valuelfAbsent

  • public T join( ):join方法和get( )方法作用一样,不同的是,join方法不抛出异常

  • public boolean complete(T value):是否打断get方法立刻返回括号值

对计算结果进行处理(thenApply、handle)

  • public <U> CompletableFuture<U> thenApply:计算结果存在依赖关系,这两个线程串行化,由于存在依赖关系(当前步错,不走下一步),当前步骤有异常的话就叫停
  • public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn):计算结果存在依赖关系,这两个线程串行化,有异常也可以往下一步走,根据带的异常参数可以进一步处理
  • whenComplete:是执行当前任务的线程执行继续执行whenComplete的任务
  • whenCompleteAsync:是执行把whenCompleteAsync这个任务继续提交给线程池来进行执行

对计算结果进行消费(thenRun、thenAccept、thenApply)

  • thenRun(Runnable runnable):任务A执行完执行B,并且B不需要A的结果
  • CompletableFuture<Void> thenAccept(Consumer<? super T> action):任务A执行完成执行B,B需要A的结果,但是任务B无返回值
  • public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn):任务A执行完成执行B,B需要A的结果,同时任务B有返回值

thenRun和thenRunAsync区别

  • 没有传入自定义线程池,都用默认线程池ForkJoinPool;
  • 如果你执行第一个任务的时候,传入了一个自定义线程池:
    ​ 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池
    ​ 调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第一心任务使用的是ForkJoin线程池
  • 备注:有可能处理太快,系统优化切换原则,直接使用main线程处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action,
Executor executor) {
return uniRunStage(screenExecutor(executor), action);
}
private static final boolean useCommonPool =
(ForkJoinPool.getCommonPoolParallelism() > 1);

private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

对计算速度进行选用(applyToEither、acceptEither、runAfterEither)

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn):这个方法表示的是,谁快就用谁的结果

两任务组合,一个完成

  • applyToEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值
  • acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值
  • runAfterEither:两个任务有一个执行完成,不需要获取 future 的结果,处理任务,也没有返回值

对计算结果进行合并(thenCombine、thenAcceptBoth、runAfterBoth)

  • public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn):两个CompletionStage任务都完成后,最终把两个任务的结果一起交给thenCombine来处理。先完成的先等着,等待其他分支任务

多任务组合(allOf、anyOf)

  • allOf:等待所有任务完成
  • anyOf:只要有一个任务完成

JDK8锁

乐观锁和悲观锁

悲观锁

概念:认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改
适合写操作多的场景,先加锁可以保证写操作时数据正确(写操作包括增删改)、显式的锁定之后再操作同步资源

  • synchronized关键字和Lock的实现类都是悲观锁

乐观锁

概念:乐观锁认为自己在使用数据时不会有别的线程修改数据,所以不会添加锁,只是在更新数据的时候去判断之前有没有别的线程更新了这个数据。如果这个数据没有被更新,当前线程将自己修改的数据成功写入。如果数据已经被其他线程更新,则根据不同的实现方式执行不同的操作,比如放弃修改、重试抢锁等。

  • 乐观锁在Java中通过使用无锁编程来实现,最常采用的时CAS算法,Java原子类中的递增操作就通过CAS自旋实现的
  • 适合读操作多的场景,不加锁的特点能够使其读操作的性能大幅度提升
  • 乐观锁一般有两种实现方式(采用版本号机制、CAS算法实现)

8锁问题

①. 标准访问有ab两个线程,请问先打印邮件还是短信

②. sendEmail方法暂停3秒钟,请问先打印邮件还是短信

③. 新增一个普通的hello方法,请问先打印邮件还是hello

④. 有两部手机,请问先打印邮件还是短信

⑤. 两个静态同步方法,同1部手机,请问先打印邮件还是短信

⑥. 两个静态同步方法, 2部手机,请问先打印邮件还是短信

⑦. 1个静态同步方法,1个普通同步方法,同1部手机,请问先打印邮件还是短信

⑧. 1个静态同步方法,1个普通同步方法,2部手机,请问先打印邮件还是短信

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class Phone { //资源类
public static synchronized void sendEmail() {
//暂停几秒钟线程
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("-------sendEmail");
}

public synchronized void sendSMS() {
System.out.println("-------sendSMS");
}

public void hello() {
System.out.println("-------hello");
}
}

public class Lock8Demo {
public static void main(String[] args) {//一切程序的入口,主线程
Phone phone = new Phone();//资源类1
Phone phone2 = new Phone();//资源类2

new Thread(() -> {
phone.sendEmail();
}, "a").start();

//暂停毫秒
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}

new Thread(() -> {
//phone.sendSMS();
//phone.hello();
phone2.sendSMS();
}, "b").start();

}
}
/**
* ============================================
* 1-2
* * 一个对象里面如果有多个synchronized方法,某一个时刻内,只要一个线程去调用其中的一个synchronized方法了,
* * 其它的线程都只能等待,换句话说,某一个时刻内,只能有唯一的一个线程去访问这些synchronized方法
* * 锁的是当前对象this,被锁定后,其它的线程都不能进入到当前对象的其它的synchronized方法
* <p>
* 3-4
* * 加个普通方法后发现和同步锁无关
* * 换成两个对象后,不是同一把锁了,情况立刻变化。
* <p>
* 5-6
* 都换成静态同步方法后,情况又变化
* 三种 synchronized 锁的内容有一些差别:
* 对于普通同步方法,锁的是当前实例对象,通常指this,具体的一部部手机,所有的普通同步方法用的都是同一把锁——实例对象本身,
* 对于静态同步方法,锁的是当前类的Class对象,如Phone.class唯一的一个模板
* 对于同步方法块,锁的是 synchronized 括号内的对象
* <p>
* 7-8
* 当一个线程试图访问同步代码时它首先必须得到锁,退出或抛出异常时必须释放锁。
* *
* * 所有的普通同步方法用的都是同一把锁——实例对象本身,就是new出来的具体实例对象本身,本类this
* * 也就是说如果一个实例对象的普通同步方法获取锁后,该实例对象的其他普通同步方法必须等待获取锁的方法释放锁后才能获取锁。
* *
* * 所有的静态同步方法用的也是同一把锁——类对象本身,就是我们说过的唯一模板Class
* * 具体实例对象this和唯一模板Class,这两把锁是两个不同的对象,所以静态同步方法与普通同步方法之间是不会有竞态条件的
* * 但是一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁。
**/

从字节码角度分析synchronized实现

反汇编:javap -v -p *.class

synchronized有三种应用方式

  • 作用于实例方法,当前实例加锁,进入同步代码前要获得当前实例的锁
  • 作用于代码块,对括号里配置的对象加锁
  • 作用于静态方法,当前类加锁,进去同步代码前要获得当前类对象的锁

synchronized同步代码块

实现使用的是monitorenter和monitorexit指令

synchronized同步代码

一定是一个enter和两个exit吗?

不一定,如果方法中直接抛出了异常处理,那么就是一个monitorenter和一个monitorexit

synchronized同步普通方法

调用指令将会检查方法的ACC_SYNCHRONIZED访问标志是否被设置,如果设置了,执行线程会将先持有monitor然后再执行方法,最后再方法完成(无论是正常完成还是非正常完成)时释放minotor

synchronized同步普通方法

synchronized同步静态方法

ACC_STATIC、ACC_SYNCHRONIZED访问标志区分该方法是否静态同步方法

synchronized同步静态方法

反编译synchronized锁的是什么

任何一个对象都可以成为一个锁,在HotSpot虚拟机中,monitor采用ObjectMonitor实现

ObjectMonitor.java — ObjectMonitor.cpp — ObjectMonitor.hpp

每一个对象都带有一个对象监视器,每一个被锁的对象都会和Monitor关联起来

ObjectMonitor.hpp(底层源码解析)

公平锁和非公平锁

什么是公平锁和非公平锁

公平锁:是指多个线程按照申请锁的顺序来获取锁

非公平锁:是指在多线程获取锁的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取到锁,在高并发的情况下,有可能造成优先级反转或者饥饿现象

注意:synchronized 和 ReentrantLock 默认是非公平锁

排队抢票案例(公平出现锁饥饿)

锁饥饿:我们使用5个线程买100张票,使用ReentrantLock默认是非公平锁,获取到的结果可能都是A线程在出售这100张票,会导致B、C、D、E线程发生锁饥饿(使用公平锁会有什么问题)

为什么会有公平锁、非公平锁的设计?为什么默认非公平?

恢复挂起的线程到真正锁的获取还是有时间差的,从开发人员来看这个时间微乎其微,但是从CPU的角度来看,这个时间存在的还是很明显的,所以非公平锁能更充分的利用CPU的时间片,尽量减少CPU空闲状态时间

使用多线程很重要的考量点是线程切换的开销,当采用非公平锁时,当一个线程请求锁获取同步状态,然后释放同步状态,因为不需要考虑是否还有前驱节点,所以刚释放锁的线程在此刻再次获取同步状态的概率就变得非常大了,所以就减少了线程的开销线程的开销

什么时候用公平?什么时候用非公平?

如果为了更高的吞吐量,很显然非公平锁是比较合适的,因为节省很多线程切换时间,吞吐量自然就上去了。否则那就用公平锁,大家公平使用

源码解读(ReentrantLock默认是非公平锁)

公平锁:排序排队公平锁,就是判断同步队列是否还有先驱节点的存在(我前面还有人吗?),如果没有先驱节点才能获锁

先占先得非公平锁,是不管这个事的,只要能抢获到同步状态就可以

ReentrantLock默认是非公平锁,公平锁要多一个方法,所以非公平锁的性能更好(aqs源码)
tryAcquire

nonfairTryAcquire

可重入锁(又名递归锁)

什么是可重入锁?

可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提,锁对象得是同一个对象),不会因为之前已经获取过还没有释放而阻塞

如果是1个有synchronized修饰得递归调用方法,程序第2次进入被自己阻塞了岂不是天大的笑话,出现了作茧自缚

所以Java中ReentrantLock和Synchronized都是可重入锁,可重入锁的一个优点是可在一定程度避免死锁

可重入锁这四个字分开解释

可: 可以 | 重: 再次 | 入: 进入 | 锁: 同步锁 | 进入什么:进入同步域(即同步代码块、方法或显示锁锁定的代码)

可重入锁的种类

隐式锁(即synchronized关键字使用的锁)默认是可重入锁,在同步块、同步方法使用

在一个synchronized修饰的方法或者代码块的内部调用本类的其他synchronized修饰的方法或代码块时,是永远可以得到锁的
显示锁(即Lock)也有ReentrantLock这样的可重入锁

lock和unlock一定要一 一匹配,如果少了或多了,都会坑到别的线程

Synchronized的重入的实现机理(为什么任何一个对象都可以成为一个锁)

  • 每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针
  • 当执行monitorenter时,如果目标锁对象的计数器为零,那么说明它没有被其他线程所持有,Java虚拟机会将该锁对象的持有线程设置为当前线程,并且将计数器加1
  • 在目标锁对象的计数器不为零的情况下,如果锁对象的持有线程时当前线程,那么Java虚拟机可以将其计数器加1,否则需要等待,直到持有线程释放该锁
  • 当执行monitorexit,Java虚拟机则需将锁对象的计数器减1。计数器为零代表锁已经释放

死锁及排查

什么是死锁?

死锁是指两个或两个以上的线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力干涉那它们都将无法推进下去,如果资源充足,进程的资源请求都能够得到满足,死锁出现的可能性就很低,否则就会因争夺有限的资源而陷入死锁

死锁

产生死锁的原因

  1. 系统资源不足
  2. 进程运行推进的顺序不合适
  3. 资源分配不当

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class DeadLockDemo {
public static void main(String[] args) {
Object objectA = new Object();
Object objectB = new Object();

new Thread(() -> {
synchronized (objectA) {
System.out.println(Thread.currentThread().getName()+" got objectA, try to get objectB");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (objectB) {
System.out.println(Thread.currentThread().getName()+" got objectB");
}
}
}, "t1").start();
new Thread(() -> {
synchronized (objectB) {
System.out.println(Thread.currentThread().getName()+" got objectB, try to get objectA");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
synchronized (objectA) {
System.out.println(Thread.currentThread().getName()+" got objectA");
}
}
}, "t2").start();

}
}

如何排除死锁

方式一:纯命令

1
2
jps -l
jstack <pid>

方式二:jconsole

jconsole检测死锁

Synchronized、ReentrantLock实现生产者和消费者问题

生产者和消费者模式概述

生产者消费者模式是一个十分经典的多线程协作的模式,弄懂生产者消费者问题能够让我们对多线程编程的理解更加深刻。所谓生产消费者问题,实际上主要是包含了两类线程:

  • 一类是生产者线程用于生产数据
  • 一类是消费者线程用于消费数据

为了耦合生产者和消费者的关系,通常会采用共享的数据区域,就像一个仓库

  • 生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为
  • 消费者只需要从共享数据区中获取数据,并不需要关心生产者的行为

线程四句口诀

  1. 在高内聚低耦合的前提下,线程 - >操作 - >资源类

    [假如有一个空调,三个人去操作这个空调,高内聚低耦合是指空调有制热制冷的效果,它会把这两个抽取成一个方法,对外以接口的形式去暴露,提供给操作空调的人或线程使用]

  2. 判断|操作|唤醒 [ 生产消费中 ]

  3. 多线程交互中,必须要防止多线程的虚假唤醒,也即**(判断使用while,不能使用if)**

  4. 标志位

使用Synchronized实现(隐式锁)

为了体现生产和消费过程总的等待和唤醒,Java就提供了几个方法供我们使用,这几个方法就在Object类中Object类的等待和唤醒方法(隐式锁)

  • viod wait():导致当前线程等待,直到另一个线程调用该对象的notify()方法和notifyAll()方法
  • void notify():唤醒正在等待对象监视器的单个线程
  • void notifyAll():唤醒正在等待对象监视器的所有线程

(注意:wait、notify、notifyAll方法必须要在同步块或同步方法里且成对出现使用)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package org.monochrome.producerandconsumer;

/**
* 1.题目:
* 现在两个线程,可以操作初始值为0的一个变量,实现一个线程对该变量加1,
* 一个线程对该变量减1,交替执行,来10轮,变量的初始值为0
* 2.思想:
* 1.在高内聚低耦合的前提下,线程->操作->资源类
* 2.判断操作唤醒[生产消费中]
* 3.多线程交互中,必须要放置多线程的虚假唤醒,也即(判断使用while,不能使用if)
*/
public class ThreadWaitNotifyBySynchronizedDemo {
public static void main(String[] args) {
AirCondition airCondition = new AirCondition();
new Thread(() -> {
for (int i = 1; i < 11; i++) airCondition.increment();
}, "线程A").start();
new Thread(() -> {
for (int i = 1; i < 11; i++) airCondition.decrement();
}, "线程B").start();
new Thread(() -> {
for (int i = 1; i < 11; i++) airCondition.increment();
}, "线程C").start();
new Thread(() -> {
for (int i = 1; i < 11; i++) airCondition.decrement();
}, "线程D").start();
}
}

class AirCondition {
private int number = 0;

public synchronized void increment() {
//1.判断
/* if(number!=0){*/
while (number != 0) {
try {
//为什么不用if?解释如下
//第一次A进来了,在number++后(number=1) C抢到执行权,进入wait状态
//这个时候,A抢到cpu执行权,也进入wait状态,此时,B线程进行了一次消费
//唤醒了线程,这个时候A抢到CPU执行权,不需要做判断,number++(1),唤醒线程
//C也抢到CPU执行权,不需要做判断,number++(2)
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//2.干活
number++;
System.out.println(Thread.currentThread().getName() + ":" + number);
//3.唤醒
this.notifyAll();
}

public synchronized void decrement() {
/*if (number==0){*/
while (number == 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
number--;
System.out.println(Thread.currentThread().getName() + ":" + number);
this.notifyAll();
}
}

使用ReentrantLock实现(显式锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package org.monochrome.producerandconsumer;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* 1.题目:
* 现在两个线程,可以操作初始值为0的一个变量,实现一个线程对该变量加1,
* 一个线程对该变量减1,交替执行,来10轮,变量的初始值为0
* 2.思想:
* 1.在高内聚低耦合的前提下,线程->操作->资源类
* 2.判断操作唤醒[生产消费中]
* 3.多线程交互中,必须要放置多线程的虚假唤醒,也即(判断使用while,不能使用if)
*/
public class ThreadWaitNotifyByReentrantLockDemo {
public static void main(String[] args) {
AirCondition2 airCondition = new AirCondition2();
new Thread(() -> {
for (int i = 0; i < 10; i++) airCondition.decrement();
}, "线程A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) airCondition.increment();
}, "线程B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) airCondition.decrement();
}, "线程C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) airCondition.increment();
}, "线程D").start();
}
}

class AirCondition2 {
private int number = 0;
//定义Lock锁对象
final Lock lock = new ReentrantLock();
final Condition condition = lock.newCondition();

//生产者,如果number=0就 number++
public void increment() {
lock.lock();
try {
//1.判断
while (number != 0) {
try {
condition.await();//this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//2.干活
number++;
System.out.println(Thread.currentThread().getName() + ":\t" + number);
//3.唤醒
condition.signalAll();//this.notifyAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

//消费者,如果number=1,就 number--
public void decrement() {
lock.lock();
try {
//1.判断
while (number == 0) {
try {
condition.await();//this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//2.干活
number--;
System.out.println(Thread.currentThread().getName() + ":\t" + number);
//3.唤醒
condition.signalAll();//this.notifyAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

阻塞队列

概念:当阻塞队列为空时,获取(take)操作是阻塞的;当阻塞队列为满时,添加(put)操作是阻塞的。

BlockingQueue

为什么用?有什么好处?

好处:阻塞队列不用手动控制什么时候该被阻塞,什么时候该被唤醒,简化了操作。在concurrent包发布以前,在多线程环境下,我们每个程序员都必须自己去控制这些细节,尤其还要兼顾效率和线程安全,而这会给我们的程序带来不小的复杂度.

阻塞队列种类

体系CollectionQueueBlockingQueue→七个阻塞队列实现类。

类名 作用
ArrayBlockingQueue 数组构成的有界阻塞队列
LinkedBlockingQueue 链表构成的有界阻塞队列
PriorityBlockingQueue 支持优先级排序的无界阻塞队列
DelayQueue 支持优先级的延迟无界阻塞队列
SynchronousQueue 单个元素的阻塞队列
LinkedTransferQueue 由链表构成的无界阻塞队列
LinkedBlockingDeque 由链表构成的双向阻塞队列

粗体标记的三个用得比较多,许多消息中间件底层就是用它们实现的。

需要注意的是LinkedBlockingQueue虽然是有界的,但有个巨坑,其默认大小是Integer.MAX_VALUE,高达21亿,一般情况下内存早爆了(在线程池的ThreadPoolExecutor有体现)。

API

API:抛出异常是指当队列满时,再次插入会抛出异常;返回布尔是指当队列满时,再次插入会返回false;阻塞是指当队列满时,再次插入会被阻塞,直到队列取出一个元素,才能插入。超时是指当一个时限过后,才会插入或者取出。

方法类型 抛出异常 返回布尔 阻塞 超时
插入 add(E e) offer(E e) put(E e) offer(E e,Time,TimeUnit)
取出 remove() poll() take() poll(Time,TimeUnit)
队首 element() peek()
抛出异常 当阻塞队列满时,再往队列里面add插入元素会抛IllegalStateException: Queue full
当阻塞队列空时,再往队列Remove元素时候回抛出NoSuchElementException
特殊值 插入方法,成功返回true 失败返回false
移除方法,成功返回元素,队列里面没有就返回null
一直阻塞 当阻塞队列满时,生产者继续往队列里面put元素,队列会一直阻塞直到put数据or响应中断退出。
当阻塞队列空时,消费者试图从队列take元素,队列会一直阻塞消费者线程直到队列可用。
超时退出 当阻塞队列满时,队列会阻塞生产者线程一定时间,超过后限时后生产者线程就会退出

SynchronousQueue

队列只有一个元素,如果想插入多个,必须等队列元素取出后,才能插入,只能有一个“坑位”,用一个插一个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
* @author Monochrome
* @date 2021/5/26
*/
public class SynchronousQueueDemo {

public static void main(String[] args) {
BlockingQueue<String> blockingQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "\t put 1");
blockingQueue.put("1");
System.out.println(Thread.currentThread().getName() + "\t put 2");
blockingQueue.put("2");
System.out.println(Thread.currentThread().getName() + "\t put 3");
blockingQueue.put("3");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "AAA").start();

new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + "\t" + blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "BBB").start();
}
}

LockSupport与线程中断

线程中断机制

一个线程不应该由其他线程来强制中断或停止,而是应该由线程自己自行停止,所以,Thread.stop、Thread.suspend、Thread.resume都已经被废弃了

在Java中没有办法立即停止一条线程,然而停止线程却显得尤为重要,如取消一个耗时操作。因此,Java提供了一种用于停止线程的协商机制-中断,也即中断标志协商机制

中断只是一种协作机制,Java没有给中断增加任何语法,中断的过程完全需要程序员自己实现

若要中断一个线程,你需要手动调用该线程的interrupt方法,该方法也仅仅是将线程对象的中断标识设为true

每个线程对象中都有一个标识,用于标识线程是否被中断;该标识位为true表示中断,为false表示未中断;通过调用线程对象的interrupt方法将线程的标识位设为true;可以在别的线程中调用,也可以在自己的线程中调用

public void interrupt()实例方法

interrupt()仅仅是设置线程的中断状态为true,发起一个协商而不会立即停止线程

public static boolean interrupted()静态方法

判断当前线程是否被中断并清除当前中断状态

做了两件事:

  1. 返回当前线程的中断状态,测试当前线程是否己被中断
  2. 将当前线程的中断状态清零并重新设为false,清除线程的中断状态

如果连续两次调用此方法,第一次会返回true,然后这个方法会将中断标识位设置位false,所以第二次调用将返回false

如果线程处于被阻塞状态(例如处于 sleep, wait, join 等状态),在别的线程中调用当前线程对象的interrupt方法,那么线程将立即退出被阻塞状态,并抛出一个InterruptedException异常。

public boolean isInterrupted()实例方法

判断当前线程是否被中断(通过检查中断标识位)

比较静态方法interrupted和实例方法isInterrupted

  1. 静态方法interrupted将会清除中断状态(传入的参数ClearInterrupted位true)
  2. 实例方法isInterrupted则不会(传入的参数ClearInterrupted为false)
1
2
3
4
5
6
7
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
public boolean isInterrupted() {
return isInterrupted(false);
}
private native boolean isInterrupted(boolean ClearInterrupted);

如何使用中断标识停止线程

  • 在需要中断的线程中不断监听中断状态,一旦发生中断,就执行型对于的中断处理业务逻辑
  • 三种中断标识停止线程的方式
    1. 通过一个volatile变量实现
    2. 通过AtomicBoolean
    3. 通过Thread类自带的中断API方法实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package org.monochrome.interrupt;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* @author monochrome
* @date 2022/11/27
*/
public class InterruptDemo {

static volatile boolean stopped = false;
static AtomicBoolean atomicBoolean = new AtomicBoolean(false);

public static void main(String[] args) {
// interruptByVolatile();
// interruptByAtomicBoolean();
interruptByThreadAPI();
}

private static void interruptByVolatile() {
new Thread(() -> {
while (true) {
if (stopped) {
System.out.println(Thread.currentThread().getName() + "\t stopped is modified to " + stopped);
break;
}
System.out.println("----hello volatile ");
}
}, "t1").start();
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t modifies stopped to true");
stopped = true;
}, "t2").start();
}

private static void interruptByAtomicBoolean() {
new Thread(() -> {
while (true) {
if (atomicBoolean.get()) {
System.out.println(Thread.currentThread().getName() + "\t atomicBoolean is modified to " + atomicBoolean.get());
break;
}
System.out.println("----hello atomicBoolean ");
}
}, "t1").start();
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t modifies atomicBoolean to true");
atomicBoolean.set(true);
}, "t2").start();
}

/**
* 在需要中断的线程中不断监听中断状态,一旦发生中断,就执行相应的中断处理业务逻辑stop线程
*/
private static void interruptByThreadAPI() {
Thread t1 = new Thread(() -> {
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println(Thread.currentThread().getName() + " isInterrupted() is true!");
break;
}
System.out.println(Thread.currentThread().getName() + " hello interrupt api");
}
}, "t1");
t1.start();
try {
TimeUnit.MILLISECONDS.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " interrupts t1");
t1.interrupt();
}, "t2").start();
}
}

sleep方法抛出InterruptedException后,中断标识也被清空置为false,我们在catch没有通过调用th.interrupt()方法再次将中断标识位设置为true,这就导致无限循环了

等待唤醒机制

三种让线程等待和唤醒的方法

  • 方式1:使用Object中的wait()方法让线程等待,使用Object中的notify()方法唤醒线程
  • 方式2:使用JUC包中Condition的await()方法让线程等待,使用signal()方法唤醒线程
  • 方式3:LockSupport类可以阻塞当前线程以及唤醒指定被阻塞的线程

前面两种方式有限制条件:

  1. 线程先要获得并持有锁,必须在锁块(synchronized或lock)中
  2. 必须要先等待后唤醒,线程才能够被唤醒

Demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class LockSupportDemo {

public static void main(String[] args) {
// waitAndNotify();
awaitAndSignal();
}

private static void awaitAndSignal() {
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " come in");
condition.await();
System.out.println(Thread.currentThread().getName() + " was signaled");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}

}, "t1").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " signal");
condition.signal();
} finally {
lock.unlock();
}

}, "t2").start();
}

private static void waitAndNotify() {
Object objectLock = new Object();
new Thread(() -> {
synchronized (objectLock) {
System.out.println(Thread.currentThread().getName() + " come in");
try {
objectLock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " was notified");
}
}, "t1").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
new Thread(() -> {
synchronized (objectLock) {
objectLock.notify();
System.out.println(Thread.currentThread().getName()+" notify");
}
}, "t2").start();
}

}

LockSupport

LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。

LockSupport类使用了一种名为Permit(许可)的概念来做到阻塞和唤醒线程的功能,每个线程都有一个许可(permit)。

可以把许可看成是一种(0,1)信号量(Semaphore),但与 Semaphore 不同的是,许可的累加上限是1。

park()/park(Object blocker)阻塞方法

permit默认是0,所以一开始调用park()方法,当前线程就会阻塞,直到别的线程将当前线程的permit设置为1时,park方法会被唤醒,然后会将permit再次设置为0并返回

static void park():底层是Unsafe类的native方法

unpark(Thread thread唤醒方法

调用unpark(thread)方法后,就会将thread线程的许可permit设置成1(注意多次调用unpark方法,不会累加,permit值还是1)会自动唤醒thread线程,即之前阻塞中的LockSupport.park()方法会立即返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class LockSupportDemo2 {

public static void main(String[] args) {
Thread t1 = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " come in");
LockSupport.park();
System.out.println(Thread.currentThread().getName() + " was unparked");
}, "t1");
t1.start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
new Thread(() -> {
LockSupport.unpark(t1);
System.out.println(Thread.currentThread().getName()+" unparks t1");
}, "t2").start();

}
}

LockSupport解决的痛点

  • LockSupport不用持有锁块,不用加锁,程序性能好
  • 先后顺序,不容易导致卡死(因为unpark获得了一个凭证,之后再调用park方法,就可以名正言顺的凭证消费,故不会阻塞)

为什么可以先唤醒线程后阻塞线程?

因为unpark获得了一个凭证,之后再调用park方法,就可以名正言顺的凭证消费,故不会阻塞

为什么唤醒两次后阻塞两次,但最终结果还会阻塞线程?

因为凭证的数量最多为1,连续调用两次unpark和调用一次unpark效果一样,只会增加一个凭证;而调用两次park却需要消费两个凭证,证不够,不能放行

Java内存模型(JMM)

JMM(Java内存模型-Java Memory Model)本身是一种抽象的概念,并不真实存在,它描述的是一组规则或规范,通过这组规范定义了程序中(尤其是多线程)各个变量(包括实例字段,静态字段和构成数组对象的元素)的读写访问方式并决定一个线程对共享变量的写入何时以及如何变成对另一个线程可见,关键技术点都是围绕多线程的原子性、可见性和有序性展开的。

作用:

  1. 通过JMM来实现线程和主内存之间的抽象关系
  2. 屏蔽各个硬件平合和操作系统的内存访问差异以实现让Java程序在各种平台下都能达到一致的内存访问效果。

为什么会推导出JMM模型呢?

  1. 因为有这么多级的缓存(cpu和物理主内存的速度不一致的),CPU的运行并不是直接操作内存而是先把内存里边的数据读到缓存,而内存的读和写操作的时候就会造成不一致的问题
  2. Java虚拟机规范中试图定义一种Java内存模型来屏蔽掉各种硬件和操作系统的内存访问差异,以实现让Java程序在各种平台下都能达到一致的内存访问效果

数据同步八大原子操作

一个变量如何从主内存拷贝到工作内存、如何从工作内存同步到主内存之间的实现细节,Java内存模型定义了以下八种操作来完成:

  1. lock(锁定):作用于主内存的变量,把一个变量标记为一条线程独占状态
  2. unlock(解锁):作用于主内存的变量,把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
  3. read(读取):作用于主内存的变量,把一个变量值从主内存传输到线程的工作内存中,以便随后的load动作使用
  4. load(载入):作用于工作内存的变量,它把read操作从主内存中得到的变量值放入工 作内存的变量副本中
  5. use(使用):作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎
  6. assign(赋值):作用于工作内存的变量,它把一个从执行引擎接收到的值赋给工作内存的变量
  7. store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的write的操作
  8. write(写入):作用于工作内存的变量,它把store操作从工作内存中的一个变量的值传送到主内存的变量中

如果要把一个变量从主内存中复制到工作内存中,就需要按顺序地执行read和load操作,如果把变量从工作内存中同步到主内存中,就需要按顺序地执行store和write操作。但Java内存模型只要求上述操作必须按顺序执行,而没有保证必须是连续执行

数据同步八大原子操作过程

JVMM规范下三大特性

可见性

指当一个线程修改了某一个共享变量的值,其他线程是否能够立即知道该变更,JVMM规定了所有的变量都存储在主内存中

假设有A、B两个线程同时去操作主物理内存的共享数据number=0,A抢到CPU执行权,将number刷新到自己的工作内存,这个时候进行number++的操作,这个时候number=1,将A中的工作内存中的数据刷新到主物理内存,这个时候,马上通知B,B重新拿到最新值number=1刷新B的工作内存中

Java中普通的共享变量不保证可见性,因为数据修改被写入内存的时机是不确定的,多线程并发很可能出现”脏读”,所以每个线程都有自己的工作内存,线程自己的工作内存中保存了该线程使用到的变量的主内存副本拷贝,线程对变量的所有操作(读取、赋值等)都必须在线程自己的工作内存中进行,而不能直接读写主内存中的变量。不同线程之间也无法直接访问对工作内存中的变量,线程间变量值的传递均需要通过主内存来完成

volatile可以解决可见性(能否及时看到)

原子性

指一个操作是不可中断的,即多线程坏境下,操作不能被其他线程干扰

有序性

计算机在执行程序时,为了提高性能,编译器和处理器常常会做指令重排,一把分为以下3种

JMM有序性指令重排

  • 单线程坏境里面确保程序最终执行结果和代码顺序执行的结果一致
  • 处理器在进行重新排序是必须要考虑指令之间的数据依赖性
  • 多线程坏境中线程交替执行,由于编译器优化重排的存在,两个线程使用的变量能否保持一致是无法确认的,结果无法预测

JVMM规范下多线程对变量的读写过程

由于JVM运行程序的实体是线程,而每个线程创建时JVM都会为其创建一个工作内存(有些地方称为栈空间),工作内存是每个线程的私有数据区域,而Java内存模型中规定所有的变量都存储在主内存,主内存是共享内存区域,所有线程都可以访问,但线程对变量的操作(读取赋值等)必须在工作内存中进行,首先要将变量从主内存拷贝到线程自己的工作内存空间,然后对变量进行操作,操作完成后将变量写回主内存,不能直接操作主内存中的变量,各个线程的工作内存中存储着主内存中的变量副本拷贝,因此不同的线程间无法访问对方的工作内存,线程间的通信(传值)必须通过主内存来完成

JMM读取过程

JMM定义了线程和主内存之间的抽象关系

  1. 线程之间的共享变量存储在主内存中(从硬件角度来说就是内存条)
  2. 每个线程都有一个私有的本地工作内存,本地工作内存中存储了该线程用来读/写共享变量的副本(从硬件角度来说就是CPU的缓存,比如寄存器、L1、L2、L3缓存等)

总结:

  • 我们定义的所有共享变量都储存在物理主内存
  • 每个线程都有自己独立的工作内存,里面保存该线程使用到的变量的副本(主内存中该变量的一份拷贝)
  • 线程对共享变量所有的操作都必须先在线程自己的工作内存中进行后写回主内存,不能直接从主内存中读写(不能越级)
  • 不同线程之间也无法直接访问其他线程的工作内存中的变量,线程间变量值的传递需要通过主内存来进行(同级不能相互访问)

happens-before总原则

  • 如果一个操作happens-before另一个操作,那么第一个操作的执行结果对第二个操作可见,而且第一个操作的执行顺序排在第二个操作之前(可见性,有序性)
  • 两个操作之间存在happens-before关系,并不意味着一定要按照happens-before原则制定的顺序来执行。如果重排序之后的执行结果与按照happens-before关系来执行的结果一致,那么这种重排序并不非法(可以指令重排)

volatile与JMM

volatile2大特点

可见性、有序性、不保证原子性

内存语义

  • 当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值立即刷新回主内存中
  • 当读一个volatile变量时,JMM会把该线程对应的本地内存设置为无效,直接从主内存中读取共享变量
  • 所以volatile的写内存语义是直接刷新到主内存中,读的内存语义是直接从主内存中读取

volatile凭什么可以保证有序性和可见性

靠的是内存屏障,内存屏障分为 LoadLoad、StoreLoad、LoadStore、StoreStore

内存屏障

内存屏障(也称内存栅栏,内存栅障,屏障指令等,是一类同步屏障指令,是CPU或编译器在对内存随机访问的操作中的一个同步点,使得此点之前的所有读写操作都执行后才可以开始执行此点之后的操作),避免代码重排序。内存屏障其实就是一种JVM指令,Java内存模型的重排规则会要求Java编译器在生成JVM指令时插入特定的内存屏障指令,通过这些内存屏障指令,volatile实现了Java内存模型中的可见性和有序性,但volatile无法保证原子性

内存屏障之前的所有写操作都要回写到主内存

内存屏障之后的所有读操作都能获得内存屏障之前的所有写操作的最新结果(实现了可见性)

  • 写屏障(Store Memory Barrier):告诉处理器在写屏障之前将所有存储在缓存(Store bufferes)中的数据同步到主内存。也就是说当看到Store屏障指令,就必须把该指令之前所有写入指令执行完毕才能继续往下执行。
  • 读屏障(Load Memory Barrier):处理器在读屏障之后的读操作,都在读屏障之后执行。也就是说在Load屏障指令之后就能够保证后面的读取数据指令一定能够读取到最新的数据。

因此重排序时,不允许把内存屏障之后的指令重排序到内存屏障之前。一句话:对一个volatile变量的写, 先行发生于任意后续对这个volatile变量的读,也叫写后读。

四大屏障

屏障类型 指令示例 说明
LoadLoad Load1;LoadLoad;Load2 保证load1的读取操作在load2及后续读取操作之前执行
StoreStore Store1;StoreStore;Store2 在store2及其后的写操作执行前,保证store1的写操作已刷新到主内存
LoadStore Load1;LoadStore;Store2 在store2及其后的写操作执行前,保证load1的读操作已读取结束
StoreLoad Store1;StoreLoad;Load2 保证store1的写操作已刷新到主内存之后,load2及其后的读操作才能执行

内存屏障插入策略

volatile写

在每个volatile写操作的前⾯插⼊⼀个StoreStore屏障
在每个volatile写操作的后⾯插⼊⼀个StoreLoad屏障

volatile写加内存屏障

volatile读

在每个volatile读操作的后⾯插⼊⼀个LoadLoad屏障
在每个volatile读操作的后⾯插⼊⼀个LoadStore屏障

volatile读加内存屏障

代码展示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//模拟一个单线程,什么顺序读?什么顺序写?
public class VolatileTest {
int i = 0;
volatile boolean flag = false;
public void write(){
i = 2;
flag = true;
}
public void read(){
if(flag){
System.out.println("---i = " + i);
}
}
}

i内存屏障插入策略

在哪些地方可以使用volatile?

  • 单一赋值可以,but含复合运算赋值不可以(i++之类)
  • 状态标志,判断业务是否结束
  • 开销较低的读,写锁策略
  • 单例模式,DCL双端检锁的发布

JVM-JMM-CPU底层执行全过程

JVM(内存中)是基于栈的指令集架构,比如我们去执行一个运算的操作,最终是由CPU执行的

  1. 比如sconst_0这个指令会交给执行引擎进行翻译,解释执行器或JLT转换为汇编

  2. 汇编指令会转化为二进制

  3. 在二进制下面是线程A,需要这个线程作为载体

  4. cpu不是马上执行,而是CPU调度到线程A才执行线程A的代码

  5. KLT模式,JVM创建一个线程,底层会维护一个线程表,而这个线程与JVM中的线程是一一对应的关系

JVM-JMM-CPU底层执行全过程

缓存一致性协议

变量加了volatile关键字,在汇编会有一个lock锁前缀(触发硬件缓存锁机制)
硬件缓存锁机制包含总线锁、缓存一致性协

早期技术落后,使用总线保持缓存一致

例子: 早期可能CPU还没有三级缓存,t1、t2两个线程(多核)对主内存中的数据进行修改,如果某一个时刻,t1线程拿到了CPU执行权,在写回到主内存去的时候,会将总线锁抢占,抢占后t2线程就没办法去进行写入的操作,早期的这种使用总线锁的效率很低,它只能保证一个线程去写,这样多核的也就没办法发挥写操作

缓存一致性协议(最经典的是MESI协议)

mesi 在硬件约定了这样一种机制,CPU启动后,会采用一种监听模式,一直去监听总线里面消息的传递,也就是说,有任何人通过总线从内存中拿了一点东西,只要你被lock前缀修饰了,都可以感知到

Modified、Exclusive、Shared、Invalid

  1. 例如我们对主内存的数据x=0,t1线程进行赋值x=3,t2线程进行赋值x=5的操作
  2. 首先t1线程将x=0从内存–总线–读到三级缓存中,放入缓存行中存储,这时状态是E(独享的)
  3. t2线程也将x=0从内存–总线–读到三级缓存中,放入缓存行中存储,这时的状态是S(共享的),而t1线程读取到的也从E–S
  4. 这个时候t1将数据从3级缓存读到L2—L1中,t2线程也是如此
  5. 如果这个时候(情况一),这个时候t1上锁的话,那么会将t1的L1的缓存行锁住,然后将x=3(E-S-M),在写的同时,发出一个通知去告诉t2线程,这个时候t2线程就会将变量置为无效(S-I),也发出一个通知去通知线程t1的cpu,告诉它我这里置为无效了,读取到t1线程的x=3。至于什么时候t1线程将值写入主内存的时机是不确定的
  6. 如果这个时候(情况二),线程t1和线程t2同时都锁住了各自L3中的缓存行,这个时候,我们到底是执行谁的结果呢?这个时候由总线裁决,看执行谁的操作,是x=3还是x=5
  7. 总线裁决:通过总线上面电路的高低电位,每一个cpu都有自己的时钟周期
  8. 情况三:如果变量很大,我们一个缓存行存不进去,这个时候MESI就会失效,会降级到总线的机制

CAS

CAS的全称为Compare-And-Swap ,它是一条CPU并发原语,比较工作内存值(预期值)和主物理内存的共享值是否相同,相同则执行规定操作,否则继续比较直到主内存和工作内存的值一致为止。这个过程是原子的。

例如AtomicInteger类主要利用CAS(compare and swap)+volatile和native方法来保证原子操作,从而避免synchronized的高开销,执行效率大为提

CAS并发原语

CAS是JDK提供的非阻塞原子性操作,它通过硬件保证了比较-更新的原子性。

它是非阻塞的且自身具有原子性,也就是说这玩意效率更高且通过硬件保证,说明这玩意更可靠。

CAS是一条CPU的原子指令(cmpxchg指令),不会造成所谓的数据不一致问题,Unsafe提供的CAS方法(如compareAndSwapXXX)底层实现即为CPU指令cmpxchg。

执行cmpxchg指令的时候,会判断当前系统是否为多核系统,如果是就给总线加锁,只有一个线程会对总线加锁成功,加锁成功之后会执行cas操作,也就是说CAS的原子性实际上是CPU实现独占的,比起用synchronized重量级锁,这里的排他时间要短很多, 所以在多线程情况下性能会比较好。

UnSafe类

Unsafe是CAS的核心类,由于Java 方法无法直接访问底层 ,需要通过本地(native)方法来访问,UnSafe相当于一个后门,基于该类可以直接操作特定的内存数据。UnSafe类在于sun.misc包中,其内部方法操作可以向C的指针一样直接操作内存,因为Java中CAS操作依赖于UnSafe类的方法。由于CAS是一种系统原语,原语属于操作系统用于范畴,是由若干条指令组成,用于完成某个功能的一个过程,并且原语的执行必须是连续的,在执行过程中不允许中断,也即是说CAS是一条原子指令,不会造成所谓的数据不一致的问题

注意:UnSafe类中所有的方法都是native修饰的,也就是说UnSafe类中的方法都是直接调用操作底层资源执行响应的任务

unsafe.getAndIncrement()

unsafe.getAndIncrement()

变量valueOffset,便是该变量在内存中的偏移地址,因为UnSafe就是根据内存偏移地址获取数据的

变量value用volatile修饰,保证了多线程之间的可见性

自旋锁

自旋锁是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,当线程发现锁被占用时,会不断循环判断锁的状态,直到获取。这样的好处是减少线程上下文切换的消耗,缺点是循环会消耗CPU

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* 题月:实现一个自旋锁
* 自旋锁好处:循环比较获取没有类似wait的阻塞。
* 通这CAS操作完成自旋锁,A线程先进来调用lock方法自己持有锁5秒的,B随后进来后发现
* 当前线程持有锁,所以只能通过自旋等待,直到A释放锁后B随后抢到。
* @author monochrome
* @date 2022/11/30
*/
public class SpinLockDemo {

AtomicReference<Thread> threadAtomicReference = new AtomicReference<>();

public void lock() {
Thread thread = Thread.currentThread();
System.out.println(thread.getName() + "---come in");
while (!threadAtomicReference.compareAndSet(null, thread)) {
}
}

public void unlock() {
Thread thread = Thread.currentThread();
threadAtomicReference.compareAndSet(thread, null);
System.out.println(thread.getName() + "---task over, unlock");
}

public static void main(String[] args) {
SpinLockDemo spinLockDemo = new SpinLockDemo();
new Thread(() -> {
spinLockDemo.lock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
spinLockDemo.unlock();
}, "A").start();
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
new Thread(() -> {
spinLockDemo.lock();
spinLockDemo.unlock();
}, "B").start();

}
}

CAS缺点

  1. 循环时间长开销很大

    1. 我们可以看到getAndInt方法执行时,有个do while
    2. 如果CAS失败,会一直进行尝试。如果CAS长时间一直不成功,可能会给CPU带来很大的开销
  2. 只能保证一个共享变量的原子性

    1. 当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作
    2. 对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁来保证原子性
  3. 引出来ABA问题

    比如一个线程one从内存位置V中取出A,这时候另一个线程two也从内存中取出A,并且线程two进行了一些操作将值变成了B,然后线程two又将V位置的数据变成A,这时候线程one进行CAS操作发现内存中仍然是A,然后线程one操作成功。尽管线程one的CAS操作成功,但是不代表这个过程就是没问题的

ABA问题解决方案

ABA问题解决方案是使用 AtomicStampedReference,每修改一次都会有一个版本号

atomic(原子类)

基本类型原子类(AtomicInteger、AtomicBoolean、AtomicLong)

常用API简介

方法 解释
public final int get() 获取当前的值
public final int getAndSet(int newValue) 获取到当前的值,并设置新的值
public final int getAndIncrement() 获取当前的值,并自增
public final int getAndDecrement() 获取到当前的值,并自减
public final int getAndAdd(int delta) 获取到当前的值,并加上预期的值
public final int incrementAndGet() 返回的是加1后的值
boolean compareAndSet(int expect,int update) 如果输入的数值等于预期值,返回true

数组类型原子类 (AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray)

引用类型原子类 (AtomicReference、AtomicStampedReference、AtomicMarkableReference)

引用类型原子类主要有三个:AtomicReference、AtomicStampedReference、AtomicMark ableReference

AtomicStampedReference

解决ABA问题

  1. 携带版本号的引用类型原子类,可以解决ABA问题
  2. 解决修改过几次
  3. 状态戳原子引用

AtomicMarkableReference

  1. 原子更新带有标志位的引用类型对象
  2. 解决是否修改(它的定义就是将状态戳简化为true|false),类似一次性筷子
  3. 状态戳(true/false)原子引用
  4. 不建议用它解决ABA问题

AtomicStampedReference和AtomicMarkableReference区别

  1. Stamped – version number 版本号,修改一次+1
  2. Markable – true、false 是否修改过

对象的属性修改原子类 (AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater)

原子更新int类型/long类型/引用类型字段的值

使用目的

以一种线程安全的方式操作非线程安全对象内的某些字段

是否可以不要锁定整个对象,减少锁定的范围,只关注长期、敏感性变化的某一个字段,而不是整个对象,已达到精确加锁+节约内存的目的

使用要求

更新的对象属性必须使用public volatile修饰符

因为对象的属性修改类型原子类都是抽象类,所以每次使用都必须使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性

Demo

AtomicIntegerFieldUpdater

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package org.monochrome.atomic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/**
* 以一种线程安全的方式操作非线程安全对象的某些字段。
* 需求:
* 10个线程,
* 每个线程转账1000,
* 不使用synchronized,尝武使用AtomicIntegerFieldUpdater来实现。
* @author monochrome
* @date 2022/12/2
*/
public class AtomicIntegerFieldUpdaterDemo {

final static int SIZE = 10;
static CountDownLatch countDownLatch = new CountDownLatch(SIZE);

public static void main(String[] args) throws InterruptedException {
BankAccount bankAccount = new BankAccount();

for (int i = 0; i < SIZE; i++) {
new Thread(() -> {
try {
for (int j = 0; j < 1000; j++) {
bankAccount.safeAdd(bankAccount, 1);
}
} finally {
countDownLatch.countDown();
}
}, "t" + i).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + " money:" + bankAccount.money);

}

}

class BankAccount {

String bankName = "CCB";
public volatile int money = 0;

// 因为对象的属性修改类型原子类都是抽象类,所以每次使用都必领
// 使用静态方法newUpdater()创建一个更新器,并且需要设置想要更新的类和属性。
static final AtomicIntegerFieldUpdater<BankAccount> fieldUpdater =
AtomicIntegerFieldUpdater.newUpdater(BankAccount.class, "money");

public synchronized void add() {
money++;
}

public synchronized void safeAdd(BankAccount bankAccount, int delta) {
fieldUpdater.getAndAdd(bankAccount, delta);
}

}

AtomicReferenceFieldUpdater

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package org.monochrome.atomic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.stream.IntStream;

/**
* @author monochrome
* @date 2022/12/2
*/
public class AtomicReferenceFieldUpdaterDemo {

static Resource resource = new Resource();
final static int SIZE = 10;
static CountDownLatch countDownLatch = new CountDownLatch(SIZE);

public static void main(String[] args) throws InterruptedException {

IntStream.range(0, SIZE).parallel().forEach(value -> {
new Thread(() -> {
try {
resource.init(resource);
} finally {
countDownLatch.countDown();
}
}, "t" + value).start();
});
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + " finish");
}

}

class Resource {

public volatile Boolean init = Boolean.FALSE;

static final AtomicReferenceFieldUpdater<Resource, Boolean> referenceFieldUpdater =
AtomicReferenceFieldUpdater.newUpdater(Resource.class, Boolean.class, "init");

public void init(Resource resource) {
if (referenceFieldUpdater.compareAndSet(resource, Boolean.FALSE, Boolean.TRUE)) {
System.out.println(Thread.currentThread().getName() + " starts init");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " ends init");
} else {
System.out.println(Thread.currentThread().getName()+"----there is a thread to start init");
}
}

}

原子操作增强类(DoubleAccumulator 、DoubleAdder 、LongAccumulator 、LongAdder)

LongAdder和LongAccumulator区别

  1. LongAdder只能用来计算加法、减法,且从零开始计算
  2. LongAccumulator提供了自定义的函数操作

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package org.monochrome.atomic;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.IntStream;

/**
* 需求:50个线程,每个线程100w次,总点费数出夹
* @author monochrome
* @date 2022/12/2
*/
public class ClickNumberCompareDemo {


static NumberClick numberClick = new NumberClick();
static final int THREAD_NUM = 50;
static final int TIMES = 1000000;

public static void main(String[] args) throws InterruptedException {
clickBySynchronized();
clickByAtomicLong();
clickByLongAdder();
clickByLongAccumulator();
}

private static void clickByLongAccumulator() throws InterruptedException {
long startTime = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);
IntStream.range(0, THREAD_NUM).parallel().forEach((value) -> {
new Thread(() -> {
try {
for (int i = 0; i < TIMES; i++) {
numberClick.clickByLongAccumulator();
}
} finally {
countDownLatch.countDown();
}
}, "t" + value).start();
});
countDownLatch.await();
long endTime = System.currentTimeMillis();
System.out.println("----LongAccumulator total click: " + numberClick.longAccumulator.get());
System.out.println("----LongAccumulator cost Time: " + (endTime - startTime) + "ms");
}
private static void clickByLongAdder() throws InterruptedException {
long startTime = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);
IntStream.range(0, THREAD_NUM).parallel().forEach((value) -> {
new Thread(() -> {
try {
for (int i = 0; i < TIMES; i++) {
numberClick.clickByLongAdder();
}
} finally {
countDownLatch.countDown();
}
}, "t" + value).start();
});
countDownLatch.await();
long endTime = System.currentTimeMillis();
System.out.println("----LongAdder total click: " + numberClick.longAdder.sum());
System.out.println("----LongAdder cost Time: " + (endTime - startTime) + "ms");
}

private static void clickByAtomicLong() throws InterruptedException {
long startTime = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);
IntStream.range(0, THREAD_NUM).parallel().forEach((value) -> {
new Thread(() -> {
try {
for (int i = 0; i < TIMES; i++) {
numberClick.clickByAtomicLong();
}
} finally {
countDownLatch.countDown();
}
}, "t" + value).start();
});
countDownLatch.await();
long endTime = System.currentTimeMillis();
System.out.println("----AtomicLong total click: " + numberClick.atomicLong.get());
System.out.println("----AtomicLong cost Time: " + (endTime - startTime) + "ms");
}

private static void clickBySynchronized() throws InterruptedException {
long startTime = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);
IntStream.range(0, THREAD_NUM).parallel().forEach((value) -> {
new Thread(() -> {
try {
for (int i = 0; i < TIMES; i++) {
numberClick.clickBySynchronized();
}
} finally {
countDownLatch.countDown();
}
}, "t" + value).start();
});
countDownLatch.await();
long endTime = System.currentTimeMillis();
System.out.println("----synchronized total click: " + numberClick.number);
System.out.println("----synchronized cost Time: " + (endTime - startTime) + "ms");
}

}

class NumberClick {

int number = 0;
AtomicLong atomicLong = new AtomicLong();
LongAdder longAdder = new LongAdder();
LongAccumulator longAccumulator = new LongAccumulator(Long::sum, 0);
public synchronized void clickBySynchronized() {
number++;
}

public void clickByAtomicLong() {
atomicLong.getAndIncrement();
}

public void clickByLongAdder() {
longAdder.increment();
}
public void clickByLongAccumulator() {
longAccumulator.accumulate(1);
}

}

LongAdder原理

LongAdder的引入、原理、能否代替AtomicLong

  1. AtomicLong是利用底层的CAS操作来提供并发性的,比如addAndGet方法,该方法是一个native方法,它的逻辑是采用自旋的方式不断更新目标值,直到更新成功。(也即乐观锁的实现模式)
  2. 在并发量比较低的情况下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发情况下,N个线程同时进行自旋操作,N-1个线程失败,导致CPU打满场景,此时AtomicLong的自旋会成为瓶颈
  3. 这就是LongAdder引入的初衷——解决高并发环境下AtomictLong的自旋瓶颈问题
1
2
3
public final long addAndGet(long delta) {
return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
}

LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则采用化整为零的做法,从空间换时间,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和无竞争值base都加起来作为最终结果(分散热点)

LongAdder能否替代AtomicLong?

  1. LongAdder的API和AtomicLong的API还是有比较大的差异,而且AtomicLong提供的功能更丰富,尤其是addAndGet、decrementAndGet、compareAndSet这些方法。addAndGet、decrementAndGet除了单纯的做自增自减外,还可以立即获取增减后的值,而LongAdder则需要做同步控制才能精确获取增减后的值。如果业务需求需要精确的控制计数,则使用AtomicLong比较合适;

  2. 低并发、一般的业务尝尽下AtomicLong(数据准确)是足够了,如果并发量很多,存在大量写多读少的情况,那LongAdder(数据最终一致性,不保证强一致性)可能更合适

Striped64

Striped64有几个比较重要的成员函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//CPU数量,即Cells数组的最大长度
static final int NCPU = Runtime.getRuntime().availableProcessors();
//存放Cell的hash表,大小为2的幂
//这里的Cell是Striped64的内部类
transient volatile Cell[] cells;
/*
1.在开始没有竞争的情况下,将累加值累加到base;
2.在cells初始化的过程中,cells处于不可用的状态,这时候也会尝试将通过cas操作值累加到base
*/
transient volatile long base;
/*
cellsBusy,它有两个值0或1,它的作用是当要修改cells数组时加锁,
防止多线程同时修改cells数组(也称cells表),0为无锁,1位加锁,加锁的状况有三种:
(1). cells数组初始化的时候;
(2). cells数组扩容的时候;
(3).如果cells数组中某个元素为null,给这个位置创建新的Cell对象的时候;

*/
transient volatile int cellsBusy;

Striped64中一些变量或者方法的定义

  • base:类似于AtomicLong中全局的value值。再没有竞争情况下数据直接累加到base上,或者cells扩容时,也需要将数据写入到base上
  • collide:表示扩容意向,false一定不会扩容,true可能会扩容
  • cellsBusy:初始化cells或者扩容cells需要获取锁,0表示无锁状态,1表示其他线程已经持有了锁
  • casCellsBusy:通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,返回true
  • NCPU:当前计算机CPU数量,Cell数组扩容时会使用到
  • getProbe():获取当前线程的hash值
  • advanceProbe():重置当前线程的hash值

LongAdder为什么这么快呢?(分散热点)

  1. LongAdder在无竞争的情况,跟AtomicLong一样,对同一个base进行操作,当出现竞争关系时则采用化整为零的做法,从空间换时间,用一个数组cells,将一个value拆分进这个数组cells。多个线程需要同时对value进行操作时候,可以对线程id进行hash得到hash值,再根据hash值映射到这个数组cells的某个下标,再对该下标所对应的值进行自增操作。当所有线程操作完毕,将数组cells的所有值和无竞争值base都加起来作为最终结果(分散热点)
  2. sum()会将所有cell数组中的value和base累加作为返回值,核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点

内部是一个Base+一个Cell[ ]数组

base变量:非竞争状态条件下,直接累加到该变量上
Cell[ ]数组:竞争条件下(高并发下),累加各个线程自己的槽Cell[i]中

LongAdder分散热点

源码解析 longAdder.increment( )

add(1L)

  1. 最初无竞争时,直接通过casBase进行更新base的处理
  2. 如果更新base失败后,首次新建一个Cell[ ]数组(默认长度是2)
  3. 当多个线程竞争同一个Cell比较激烈时,可能就要对Cell[ ]扩容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void add(long x) {
//as是striped64中的cells数组属性
//b是striped64中的base属性
//v是当前线程hash到的cell中存储的值
//m是cells的长度减1,hash时作为掩码使用
//a时当前线程hash到的cell
Cell[] as; long b, v; int m; Cell a;
/*
首次首线程(as = cells) != null)一定是false,此时走casBase方法,以CAS的方式更新base值,
且只有当cas失败时,才会走到if中
条件1:cells不为空,说明出现过竞争,cell[]已创建
条件2:cas操作base失败,说明其他线程先一步修改了base正在出现竞争
*/
if ((as = cells) != null || !casBase(b = base, b + x)) {
//true无竞争 false表示竞争激烈,多个线程hash到同一个cell,可能要扩容
boolean uncontended = true;
/*
条件1:cells为空,说明正在出现竞争,上面是从条件2过来的,说明!casBase(b = base, b + x))=true
会通过调用longAccumulate(x, null, uncontended)新建一个数组,默认长度是2
条件2:默认会新建一个数组长度为2的数组,m = as.length - 1) < 0 应该不会出现,
条件3:当前线程所在的cell为空,说明当前线程还没有更新过cell,应初始化一个cell。
a = as[getProbe() & m]) == null,如果cell为空,进行一个初始化的处理
条件4:更新当前线程所在的cell失败,说明现在竞争很激烈,多个线程hash到同一个Cell,应扩容
(如果是cell中有一个线程操作,这个时候,通过a.cas(v = a.value, v + x)可以进行处理,返回的结果是true)
**/
if (as == null || (m = as.length - 1) < 0 ||
//getProbe( )方法返回的时线程中的threadLocalRandomProbe字段
//它是通过随机数生成的一个值,对于一个确定的线程这个值是固定的(除非刻意修改它)
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x))) {
//调用Striped64中的方法处理
longAccumulate(x, null, uncontended);
}
}
}

longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
//存储线程的probe值
int h;
//如果getProbe()方法返回0,说明随机数未初始化
if ((h = getProbe()) == 0) { //这个if相当于给当前线程生成一个非0的hash值
//使用ThreadLocalRandom为当前线程重新计算一个hash值,强制初始化
ThreadLocalRandom.current(); // force initialization
//重新获取probe值,hash值被重置就好比一个全新的线程一样,所以设置了wasUncontended竞争状态为true
h = getProbe();
//重新计算了当前线程的hash后认为此次不算是一次竞争,都未初始化,肯定还不存在竞争激烈,wasUncontended竞争状态为true
wasUncontended = true;
}
//如果hash取模映射得到的Cell单元不是null,则为true,此值也可以看作是扩容意向
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) { // CASE1:cells已经初始化了
// 当前线程的hash值运算后映射得到的Cell单元为null,说明该Cell没有被使用
if ((a = as[(n - 1) & h]) == null) {
//Cell[]数组没有正在扩容
if (cellsBusy == 0) { // Try to attach new Cell
//先创建一个Cell
Cell r = new Cell(x); // Optimistically create
//尝试加锁,加锁后cellsBusy=1
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j; //将cell单元赋值到Cell[]数组上
//在有锁的情况下再检测一遍之前的判断
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;//释放锁
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
/*
wasUncontended表示cells初始化后,当前线程竞争修改失败
wasUncontended=false,表示竞争激烈,需要扩容,这里只是重新设置了这个值为true,
紧接着执行advanceProbe(h)重置当前线程的hash,重新循环
*/
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//说明当前线程对应的数组中有了数据,也重置过hash值
//这时通过CAS操作尝试对当前数中的value值进行累加x操作,x默认为1,如果CAS成功则直接跳出循环
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//如果n大于CPU最大数量,不可扩容,并通过下面的h=advanceProbe(h)方法修改线程的probe再重新尝试
else if (n >= NCPU || cells != as)
collide = false; //扩容标识设置为false,标识永远不会再扩容
//如果扩容意向collide是false则修改它为true,然后重新计算当前线程的hash值继续循环
else if (!collide)
collide = true;
//锁状态为0并且将锁状态修改为1(持有锁)
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
//按位左移1位来操作,扩容大小为之前容量的两倍
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
//扩容后将之前数组的元素拷贝到新数组中
rs[i] = as[i];
cells = rs;
}
} finally {
//释放锁设置cellsBusy=0,设置扩容状态,然后进行循环执行
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h = advanceProbe(h);
}
//CASE2:cells没有加锁且没有初始化,则尝试对它进行加锁,并初始化cells数组
/*
cellsBusy:初始化cells或者扩容cells需要获取锁,0表示无锁状态,1表示其他线程已经持有了锁
cells == as == null 是成立的
casCellsBusy:通过CAS操作修改cellsBusy的值,CAS成功代表获取锁,返回true,第一次进来没人抢占cell单元格,肯定返回true
**/
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
//是否初始化的标记
boolean init = false;
try { // Initialize table(新建cells)
// 前面else if中进行了判断,这里再次判断,采用双端检索的机制
if (cells == as) {
//如果上面条件都执行成功就会执行数组的初始化及赋值操作,Cell[] rs = new Cell[2]标识数组的长度为2
Cell[] rs = new Cell[2];
//rs[h & 1] = new Cell(x)表示创建一个新的cell元素,value是x值,默认为1
//h & 1 类似于我们之前hashmap常用到的计算散列桶index的算法,通常都是hash&(table.len-1),同hashmap一个意思
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//CASE3:cells正在进行初始化,则尝试直接在基数base上进行累加操作
//这种情况是cell中都CAS失败了,有一个兜底的方法
//该分支实现直接操作base基数,将值累加到base上,也即其他线程正在初始化,多个线程正在更新base的值
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}

longAccumulate过程

sum()

1
2
3
4
5
6
7
8
9
10
11
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

核心的思想就是将之前AtomicLong一个value的更新压力分散到多个value中去,从而降级更新热点

为啥高并发下sum的值不精确?
  1. sum执行时,并没有限制对base和cells的更新。所以LongAdder不是强一致性,它是最终一致性的
  2. 首先,最终返回的sum局部变量,初始被赋值为base,而最终返回时,很可能base已经被更新了,而此时局部变量sum不会更新,造成不一致
  3. 其次,这里对cell的读取也无法保证是最后一次写入的值。所以,sum方法在没有并发的情况下,可以获得正确的结果

关于AtomicLong和LongAdder区别

img

ThreadLocal

ThreadLocal本地线程变量,线程自带的变量副本(实现了每一个线程副本都有一个专属的本地变量,主要解决的就是让每一个线程绑定自己的值,自己用自己的,不跟别人争抢。通过使用get()和set()方法,获取默认值或将其值更改为当前线程所存的副本的值从而避免了线程安全的问题)

ThreadLocal API

  • protected T initialValue():initialValue():返回此线程局部变量的当前线程的”初始值”
    (对于initialValue()较为老旧,jdk1.8又加入了withInitial()方法)
  • static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier):创建线程局部变量
  • T get():返回当前线程的此线程局部变量的副本中的值
  • void set(T value):将当前线程的此线程局部变量的副本设置为指定的值
  • void remove():删除此线程局部变量的当前线程的值

ThreadLocal源码分析

Thread|ThreadLocal|ThreadLocalMap关系

Thread|ThreadLocal|ThreadLocalMap关系

Thread类中有一个ThreadLocal.ThreadLocalMap threadLocals = null的变量,这个ThreadLocal相当于是Thread类和ThreadLocalMap的桥梁,在ThreadLocal中有静态内部类ThreadLocalMap,ThreadLocalMap中有Entry数组
当我们为threadLocal变量赋值,实际上就是以当前threadLocal实例为key,值为value的Entry往这个threadLocalMap中存放
t.threadLocals = new ThreadLocalMap(this, firstValue) 如下这行代码,可以知道每个线程都会创建一个ThreadLocalMap对象,每个线程都有自己的变量副本。

set方法详解

  1. 首先获取当前线程,并根据当前线程获取一个Map
  2. 如果获取的Map不为空,则将参数设置到Map中(当前ThreadLocal的引用作为key)
  3. 如果Map为空,则给该线程创建Map,并设置初始值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* 设置当前线程对应的ThreadLocal的值
*
* @param value 将要保存在当前线程对应的ThreadLocal的值
*/
public void set(T value) {
// 获取当前线程对象
Thread t = Thread.currentThread();
// 获取此线程对象中维护的ThreadLocalMap对象
ThreadLocalMap map = getMap(t);
// 判断map是否存在
if (map != null)
// 存在则调用map.set设置此实体entry
map.set(this, value);
else
// 1)当前线程Thread 不存在ThreadLocalMap对象
// 2)则调用createMap进行ThreadLocalMap对象的初始化
// 3)并将 t(当前线程)和value(t对应的值)作为第一个entry存放至ThreadLocalMap中
createMap(t, value);
}

/**
* 获取当前线程Thread对应维护的ThreadLocalMap
*
* @param t the current thread 当前线程
* @return the map 对应维护的ThreadLocalMap
*/
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

/**
* 创建当前线程Thread对应维护的ThreadLocalMap
*
* @param t 当前线程
* @param firstValue 存放到map中第一个entry的值
*/
void createMap(Thread t, T firstValue) {
//这里的this是调用此方法的threadLocal
t.threadLocals = new ThreadLocalMap(this, firstValue);
}

/**
* ThreadLocalMap构造器
*
* @param firstKey 本ThreadLocal实例(this)
* @param firstValue 要保存的线程本地变量
*/
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
//初始化table
table = new ThreadLocal.ThreadLocalMap.Entry[INITIAL_CAPACITY];
//计算索引(重点代码)
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
//设置值
table[i] = new ThreadLocal.ThreadLocalMap.Entry(firstKey, firstValue);
size = 1;
//设置阈值
setThreshold(INITIAL_CAPACITY);
}

get方法详解

  1. 先获取当前线程的ThreadLocalMap变量,如果存在则返回值,不存在则创建并返回初始值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* 返回当前线程中保存ThreadLocal的值
* 如果当前线程没有此ThreadLocal变量,
* 则它会通过调用{@link #initialValue} 方法进行初始化值
*
* @return 返回当前线程对应此ThreadLocal的值
*/
public T get() {
// 获取当前线程对象
Thread t = Thread.currentThread();
// 获取此线程对象中维护的ThreadLocalMap对象
ThreadLocalMap map = getMap(t);
// 如果此map存在
if (map != null) {
// 以当前的ThreadLocal 为 key,调用getEntry获取对应的存储实体e
ThreadLocalMap.Entry e = map.getEntry(this);
// 对e进行判空
if (e != null) {
@SuppressWarnings("unchecked")
// 获取存储实体 e 对应的 value值
// 即为我们想要的当前线程对应此ThreadLocal的值
T result = (T) e.value;
return result;
}
}
/*
初始化 : 有两种情况有执行当前代码
第一种情况: map不存在,表示此线程没有维护的ThreadLocalMap对象
第二种情况: map存在, 但是没有与当前ThreadLocal关联的entry
*/
return setInitialValue();
}

/**
* 初始化
*
* @return the initial value 初始化后的值
*/
private T setInitialValue() {
// 调用initialValue获取初始化的值
// 此方法可以被子类重写, 如果不重写默认返回null
T value = initialValue();
// 获取当前线程对象
Thread t = Thread.currentThread();
// 获取此线程对象中维护的ThreadLocalMap对象
ThreadLocalMap map = getMap(t);
// 判断map是否存在
if (map != null)
// 存在则调用map.set设置此实体entry
map.set(this, value);
else
// 1)当前线程Thread 不存在ThreadLocalMap对象
// 2)则调用createMap进行ThreadLocalMap对象的初始化
// 3)并将 t(当前线程)和value(t对应的值)作为第一个entry存放至ThreadLocalMap中
createMap(t, value);
// 返回设置的值value
return value;
}

remove方法详解

  1. 首先获取当前线程,并根据当前线程获取一个Map
  2. 如果获取的Map不为空,则移除当前ThreadLocal对象对应的entry
1
2
3
4
5
6
7
8
9
10
11
12
/**
* 删除当前线程中保存的ThreadLocal对应的实体entry
*/
public void remove() {
// 获取当前线程对象中维护的ThreadLocalMap对象
ThreadLocalMap m = getMap(Thread.currentThread());
// 如果此map存在
if (m != null)
// 存在则调用map.remove
// 以当前ThreadLocal为key删除对应的实体entry
m.remove(this);
}

ThreadLocal内存泄漏问题

什么是内存泄漏

不再会被使用的对象或者变量占用的内存不能被回收,就是内存泄漏。

垃圾回收强、软、弱、虚4大引用

垃圾回收强、软、弱、虚4大引用

为什么源代码用弱引用?

1
2
3
4
5
public void function() {
ThreadLocal<String> tl = new ThreadLocal<>();
tl.set("test");
tl.get();
}
  1. 当function()方法执行完毕后,栈帧销毁强引用 tl 也就没有了。但此时线程的ThreadLocalMap里某个entry的key引用还指向这个对象
  2. 若这个key引用是强引用,就会导致key指向的ThreadLocal对象及value指向的对象不能被gc回收,造成内存泄漏
  3. 若这个key引用是弱引用就大概率会减少内存泄漏的问题(还有一个key为null的雷)。使用弱引用,就可以使ThreadLocal对象在方法执行完毕后顺利被回收且Entry的key引用指向为null

key为null的entry造成的内存泄漏

ThreadLocal与JVM

  1. ThreadLocalMap使用ThreadLocal的弱引用作为key,如果一个ThreadLocal没有外部强引用引用他,那么系统gc的时候,这个ThreadLocal势必会被回收,这样一来,ThreadLocalMap中就会出现key为null的Entry,就没有办法访问这些key为null的Entry的value,如果当前线程再迟迟不结束的话(比如正好用在线程池),这些key为null的Entry的value就会一直存在一条强引用链
  2. 虽然弱引用,保证了key指向的ThreadLocal对象能被及时回收,但是v指向的value对象是需要ThreadLocalMap调用get、set时发现key为null时才会去回收整个entry、value
  3. 因此弱引用不能100%保证内存不泄露。我们要在不使用某个ThreadLocal对象后,手动调用remoev方法来删除它,尤其是在线程池中,不仅仅是内存泄露的问题,因为线程池中的线程是重复使用的,意味着这个线程的ThreadLocalMap对象也是重复使用的,如果我们不手动调用remove方法,那么后面的线程就有可能获取到上个线程遗留下来的value值,造成bug
  4. 如果当前thread运行结束,threadLocal,threadLocalMap, Entry没有引用链可达,在垃圾回收的时候都会被系统进行回收
  5. 但在实际使用中我们有时候会用线程池去维护我们的线程,比如在Executors.newFixedThreadPool()时创建线程的时候,为了复用线程是不会结束的,所以threadLocal内存泄漏就值得我们小心

出现内存泄漏的真实原因

  1. 没有手动删除这个Entry
  2. currentThread依然运行

remove、get、set方法会去检查所有key为null的entry

1
2
3
4
5
6
7
8
9
10
11
12
13
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
1
2
3
4
5
6
7
8
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;

while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
return e;
if (k == null)
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}

实际上都是调用expungeStaleEntry()方法,在里面将value设为null。

ThreadLocal小总结

  1. ThreadLocal本地线程变量,以空间换时间,线程自带的变量副本,人手一份,避免了线程安全问题
  2. 每个线程持有一个只属于自己的专属Map并维护了Thread Local对象与具体实例的映射,该Map由于只被持有它的线程访问,故不存在线程安全以及锁的问题
  3. ThreadLocalMap的Entry对ThreadLocal的引用为弱引用,避免了ThreadLocal对象无法被回收的问题
  4. 都会通过expungeStaleEntry,cleanSomeSlots, replaceStaleEntry这三个方法回收键为 null 的 Entry 对象的值(即为具体实例)以及 Entry 对象本身从而防止内存泄漏,属于安全加固的方法
  5. 用完之后一定要remove操作

Java对象内存布局和对象头

对象和数组在堆内存中的存储布局

  • 对象内部结构分为:对象头、实例数据、对齐填充(保证8个字节的倍数)。
  • 对象头分为对象标记(markOop)和类元信息(klassOop,又叫类型指针),类元信息存储的是指向该对象类元数据(klass)的首地址。

对象和数组在堆内存中的存储布局

对象头(Header)

对象标记Mark Word

对象标记里默认存储 (哈希值(HashCode )、GC分代年龄、锁状态标志、线程持有的锁、偏向线程ID、偏向时间戳)等信息

这些信息都是与对象自身定义无关的数据,所以MarkWord被设计成一个非固定的数据结构以便在极小的空间内存存储尽量多的数据。

它会根据对象的状态复用自己的存储空间,也就是说在运行期间MarkWord里存储的数据会随着锁标志位的变化而变化。

在64位系统中,Mark Word占了8个字节,类型指针占了8个字节,一共是16个字节

![Mark World的存储结构](../images/JUC并发编程/Mark World的存储结构.png)

类元信息Class Pointer(又叫类型指针)

对象指向它的类元数据的指针,虚拟机通过这个指针来确定这个对象是哪个类的实例

实例数据

说明:它是对象真正存储的有效信息,包括程序代码中定义的各种类型的字段(包括从父类继承下来的和本身拥有的字段)

规则:

  1. 相同宽度的字段总被分配在一起
  2. 父类中定义的变量会出现在子类之前
  3. 如果CompactFields参数为true(默认为true),子类的窄变量可能插入到父类变量的空隙

对齐填充

  1. 虚拟机要求对象起始地址必须是8字节的整数倍。填充数据不是必须存在的,仅仅是为了字节对齐。这部分内存按8字节补充对齐。
  2. 不是必须的,也没特别含义,仅仅起到占位符作用

聊聊Object obj = new Object()

引入pom

1
2
3
4
5
6
7
8
9
10
<!--
JAVA object layout
官网:http://openjdk.java.net/projects/code-tools/jol/
定位:分析对象在JVM的大小和分布
-->
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.9</version>
</dependency>

对象布局信息

字段 描述
OFFSET 偏移量,也就是到这个字段所占用的byte数
SIZE 后面类型的字节大小
TYPE Class中定义的类型
DESCRIPTION 类型的描述
VALUE TYPE在内存中的值,比如int就是0,boolean就是false

GC年龄标志位

GC年龄采用4位bit存储,最大为15,例如MaxTenuringThreshold参数默认值就是15

-XX:MaxTenuringThreshold=16

因为GC年龄占4位,最大就是1111=15

类型指针压缩

查看所有Java命令行标志

1
java -XX:+PrintCommandLineFlags -version

查看所有Java命令行标志

关闭类型指针压缩

jvm参数里加上:

1
-XX:-UseCompressedClassPointers

对象布局信息(关闭类型指针压缩)

synchronized锁升级

synchronized用的锁是存在Java对象头里的Mark word中,锁升级功能主要依赖Markword中锁标志位和释放偏向锁标志位

锁指向:

  • 偏向锁:MarkWord存储的是偏向的线程ID
  • 轻量锁:MarkWord存储的是指向线程栈Lock Record的指针;
  • 重量锁:Markword存储的是指向中的monitor对象的指针;

synchronized的性能变化

java5以前,只有synchronized,synchronized属于重量级锁,效率低下,因为监视器锁(monitor)是依赖于底层的操作系统的Mutex Lock来实现的,挂起线程和恢复线程都需要转入内核态去完成,阻塞或唤醒一个Java线程需要操作系统切换CPU状态来完成,这种状态切换需要耗费处理器时间,如果同步代码块中内容过于简单,这种切换的时间可能比用户代码执行的时间还长”,时间成本相对较高,这也是为什么早期的synchronized效率低的原因

Java 6以及之后,为了减少获得锁和释放锁所带来的性能消耗,引入了轻量级锁和偏向锁

为什么每一个对象都可以成为一个锁?

Java对象是天生的Monitor,每一个Java对象都有成为Monitor的潜质,因为在Java的设计中 ,每一个Java对象都有一个内部锁或者叫Monitor锁。

Hotspot锁状态

Hotspot锁状态

无锁

synchronized无锁对象布局信息

偏向锁-单线程竞争

当线程A第一次竞争到锁时,通过操作修改Mark Word中的偏向线程ID、偏向模式。

如果不存在其他线程竞争,那么持有偏向锁的线程将永远不需要进行同步

作用

当一段同步代码一直被同一个线程多次访问,由于只有一个线程,那么该线程在后续访问时便会自动获得锁(偏向锁)

理论

在实际应用运行过程中发现,“锁总是同一个线程持有,很少发生竞争”,也就是说锁总是被第一个占用他的线程拥有,这个线程就是锁的偏向线程

那么只需要在锁第一次被拥有的时候,记录下偏向线程ID。这样偏向线程就一直持有着锁(后续这个线程进入和退出这段加了同步锁的代码块时,不需要再次加锁和释放锁。而是直接去检查锁的MarkWord里面是不是自己的线程ID)。

如果相等,表示偏向锁是偏向于当前线程的,就不需要再尝试获得锁了,直到竞争发生才释放锁。以后每次同步,检查锁的偏向线程ID与当前线程ID是否一致,如果一致直接进入同步。无需每次加锁解锁都去CAS更新对象头。如果自始至终使用锁的线程只有一个,很明显偏向锁几乎没有额外开销,性能极高。

假如不等,表示发生了竞争,锁已经不是总是偏向于同一个线程了,这时候会尝试使用CAS来替换MarkWord里面的线程ID为新线程的ID

竞争成功,表示之前的线程不存在了,MarkWord里面的线程ID为新线程的ID,锁不会升级,仍然为偏向锁。

竞争失败,这个时候可能需要升级变为轻量级锁,才能保证线程间公平竞争锁。

偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程是不会主动释放偏向锁的

技术实现

一个synchronized方法被一个线程抢到了锁时,那这个方法所在的对象就会在其所在的Mark Word中将偏向锁修改状态位,同时还会有占用前54位来存储线程指针作为标识。若该线程再次访问同一个synchronized方法时,该线程只需去对象头的Mark Word 中去判断一下是否有偏向锁指向本身的ID,无需再进入 Monitor 去竞争对象了。

synchronized无锁态升级到偏向锁

JVM参数说明

延迟:实际上偏向锁在JDK1.6之后是默认开启的,但是启动时间有延迟,默认延迟是4秒,添加参数-XX:BiasedLockingStartupDelay=0,让其在程序启动时立刻启动。

开启偏向锁::XX:+UseBiasedLocking -XX:BiasedLockingStartupDelay=0 并设置延迟为0s

关闭偏向锁:-XX:-UseBiasedLocking,关闭之后程序默认会直接进入轻量级锁状态。

偏向锁的撤销

偏向锁使用一种等到竞争出现才释放锁的机制,只有当其他线程竞争锁时,持有偏向锁的原来线程才会被撤销。撤销需要等待全局安全点(该时间点上没有字节码正在执行),同时检查持有偏向锁的线程是否还在执行

  1. 第一个线程正在执行synchronized方法(处于同步块),它还没有执行完,其它线程来抢夺,该偏向锁会被取消掉并出现锁升级。此时轻量级锁由原持有偏向锁的线程持有,继续执行其同步代码,而正在竞争的线程会进入自旋等待获得该轻量级锁
  2. 第一个线程执行完成synchronized方法(退出同步块),则将对象头设置成无锁状态并撤销偏向锁 ,重新偏向

synchronized锁升级流程

轻量级锁-多线程竞争

轻量级锁是为了在线程近乎交替执行同步块时提高性能

主要目的:在没有多线程竞争的前提下,通过CAS减少重量级锁使用操作系统互斥量产生的性能消耗,说白了先自旋再阻塞

升级时机:当关闭偏向锁功能或多线程竞争偏向锁会导致偏向锁升级为轻量级锁

假如线程A已经拿到锁,这时线程B又来抢该对象的锁,由于该对象的锁已经被线程A拿到,当前该锁已是偏向锁了。

而线程B在争抢时发现对象头Mark Word中的线程ID不是线程B自己的线程ID(而是线程A),那线程B就会进行CAS操作希望能获得锁。此时线程B操作中有两种情况

  1. 如果锁获取成功,直接替换Mark Word中的线程ID为B自己的ID(A → B),重新偏向于其他线程(即将偏向锁交给其他线程,相当于当前线程”被”释放了锁),该锁会保持偏向锁状态,A线程Over,B线程上位
  2. 如果锁获取失败,则偏向锁升级为轻量级锁,此时轻量级锁由原持有偏向锁的线程持有,继续执行其同步代码,而正在竞争的线程B会进入自旋等待获得该轻量级锁。

主要作用(本质就是自旋锁)

有线程来参与锁的竞争,但是获取锁的冲突时间极短

自旋达到一定次数和程度

java6之前(了解):默认启用,默认情况下自旋的次数是10次,-XX:PreBlockSpin=10来修改或者自旋线程数超过cpu核数一半

Java6之后:自适应(自适应意味着自旋的次数不是固定不变的),而是根据:同一个锁上一次自旋的时间和拥有锁线程的状态来决定。自适应自旋锁的大致原理:线程如果自旋成功了,那下次自旋的最大次数会增加,因为JVM认为既然上次成功了,那么这一次也很大概率会成功。反之,如果很少会自旋成功,那么下次会减少自旋的次数甚至不自旋,避免CPU空转。

轻量锁与偏向锁的区别和不同

  1. 争夺轻量级锁失败时,自旋尝试抢占锁
  2. 轻量级锁每次退出同步块都需要释放锁,而偏向锁是在竞争发生时才释放锁

重量级锁-多线程竞争激烈

有大量的线程参与锁的竞争,冲突性很高,会有用户态、内核态切换。

重量级锁原理

Java中synchronized的重量级锁,是基于进入和退出Monitor对象实现的。在编译时会将同步块的开始位置插入monitor enter指令,在结束位置插入monitor exit指令。

当线程执行到monitor enter指令时,会尝试获取对象所对应的Monitor所有权,如果获取到了,即获取到了锁,会在Monitor的owner中存放当前线程的id,这样它将处于锁定状态,除非退出同步块,否则其他线程无法获取到这个Monitor。

hashCode存放

锁升级为轻量级或重量级锁后,Mark Word中保存的分别是线程栈帧里的锁记录指针和重量级锁指针,已经没有位置再保存哈希码,GC年龄了,那么这些信息被移动到哪里去了呢?

在无锁状态下,Mark Word中可以存储对象的identity hash code值。当对象的hashCode()方法第一次被调用时,JVM会生成对应的identity hash code值并将该值存储到Mark Word中。

对于偏向锁,在线程获取偏向锁时,会用Thread ID和epoch值覆盖identity hash code 所在的位置。如果一个对象的hashCode()方法己经被调用过一次之后,这个对象不能被设置偏向锁。因为如果可以的话,那Mark Word中的identity hash code必然会被偏向线程ID给覆盖,这就会造成同一个对象前后两次调用hashCode()方法得到的结果不一致。

升级为轻量级锁时,JVM会在当前线程的栈帧中创建一个锁记录(Lock Record)空间,用于存储锁对象的Mark Word拷贝,该拷贝中可以包含identity hash code, 所以轻量级锁可以和identity hash code共存,哈希码和GC年龄自然保存在此,释放锁后会将这些信息写回到对象头。

升级为重量级锁后,Mark Word保存的重量级锁指针,代表重量级锁的ObjectMonitor类里有字段记录非加锁状态下的Mark Word,锁释放后也会将信息写回到对象头。

各个锁的优缺点的对比

优点 缺点 适用场景
偏向锁 加锁和解锁不需要额外的消耗,和执行非同步方法相比仅存在纳秒级的差距 如果线程间存在锁竞争,会带来额外的锁撤销的消耗 适用于只有一个线程访问同步块场景
轻量级锁 竞争的线程不会阻塞,提高了程序的响应速度 如果始终得不到锁竞争的线程,使用自旋会消耗CPU 追求响应时间,同步块执行速度非常快
重量级锁 线程竞争不使用自旋,不会消耗CPU 线程阻塞,响应时间缓慢 追求吞吐量,同步块执行速度较慢

synchronized锁升级过程总结:一句话,就是先自旋,不行再阻塞。

实际上是把之前的悲观锁(重量级锁)变成在一定条件下使用偏向锁以及使用轻量级(自旋锁CAS)的形式

synchronized在修饰方法和代码块在字节码上实现方式有很大差异,但是内部实现还是基于对象头的MarkWord来实现的

JDK1.6之前synchronized使用的是重量级锁,JDK1.6之后进行了优化,拥有了无锁->偏向锁->轻量级锁->重量级锁的升级过程,而不是无论什么情况都使用重量级锁。

总结

  • 偏向锁:适用于单线程适用的情况,在不存在锁竞争的时候进入同步方法/代码块则使用偏向锁。
  • 轻量级锁:适用于竞争较不激烈的情况(这和乐观锁的使用范围类似), 存在竞争时升级为轻量级锁,轻量级锁采用的是自旋锁,如果同步方法/代码块执行时间很短的话,采用轻量级锁虽然会占用cpu资源但是相对比使用重量级锁还是更高效。
  • 重量级锁:适用于竞争激烈的情况,如果同步方法/代码块执行时间很长,那么使用轻量级锁自旋带来的性能消耗就比使用重量级锁更严重,这时候就需要升级为重量级锁

锁消除

从JIT(just-in-time compiler,及时编译器)角度看相当于无视它,synchronized (o)不存在了,这个锁对象并没有被共用扩散到其它线程使用,极端的说就是根本没有加这个锁对象的底层机器码,消除了锁的使用

锁粗化

假如方法中首尾相接,前后相邻的都是同一个锁对象,那JIT编译器就会把这几个synchronized块合并成一个大块,加粗加大范围,一次申请锁使用即可,避免次次的申请和释放锁,提升了性能

AQS(AbstractQueuedSynchronizer)

AQS是用来构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石,主要解决锁的分配问题,通过内置的CLH(FIFO队列的变种)来完成资源获取线程的排队工作,将每条将要去抢占资源的线程封装成一个Node节点来实现锁的分配,有一个int类变量表示持有锁的状态(private volatile int state),通过CAS完成对status值的修改(0表示没有,1表示阻塞)

CLH:Craig、Landin and Hagersten 队列,是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO,从尾部入队,从头部出队

加锁会导致阻塞、有阻塞就需要排队,实现排队必然需要队列

如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的结点(Node) ,通过CAS、自旋以及LockSuport.park()的方式,维护state变量的状态,使并发达到同步的效果

CLH(FIFO)队列

锁和同步器的关系

  • 锁:面向锁的使用者(定义了程序员和锁交互的使用层API,隐藏了实现细节,你调用即可)
  • 同步器:面向锁的实现者(比如Java并发大神Douglee,提出统一规范并简化了锁的实现,屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等,是一切锁和同步组件实现的公共基础部分,这里用到了模板方法模式)

AQS框架

AQS框架

ReentrantLock

ReentrantLock类图

ReentrantLock类图

![AQS Node类](../images/JUC并发编程/AQS Node类.png)

从最简单的lock方法开始看看公平和非公平

FairSync.lock()

NonfairSync.lock()

非公平锁会在调用lock()方法时尝试修改state状态来获取到锁,如果成功了就会设置当前线程拥有独占访问权限。

公平锁和非公平锁tryAcquire()区别

可以明显看出公平锁与非公平锁的lock()方法唯一的区别就在于公平锁在获取同步状态时多了一个限制条件:hasQueuedPredecessors()

hasQueuedPredecessors()是公平锁加锁时判断等待队列中是否存在有效节点的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 判断当前线程之前有没有排队的线程
* @return code 如果在当前线程之前有排队的线程,则为true ;如果当前线程位于队列的头部或队列为空,则为false
*/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

差异

  • 公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中己经有线程在等待,那么当前线程就会进入等待队列中;
  • 非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一个排队线程苏醒后,不一定就是排头的这个线程获得锁,它还是需要参加竞争锁(存在线程竞争的情况下),后来的线程可能不讲武德插队夺锁了。

非公平锁lock()方法详解

1
2
3
4
5
6
final void lock() {
if (compareAndSetState(0, 1))// 第一个线程抢占
setExclusiveOwnerThread(Thread.currentThread());
else// 第二个线程及后续线程抢占
acquire(1);
}

acquire()

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

tryAcquire -> addWaiter -> acquireQueued

尝试tryAcquire获取锁,如果失败了则会取反为true走后续的判断,addWaiter去

tryAcquire()

java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();// 获取当前state,为0代表没有线程占用,可以获得锁,为1代表已经有现成占用
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 判断当前线程和占用锁的线程是否是同一个,也就是可重入锁的实现
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter()

java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* 为当前线程以给定模式创建和入队节点。
*
* @param mode Node.EXCLUSIVE 为独占,Node.SHARED 为共享
* @return 新节点
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {// 判断当前队列的为节点是否为null,为null表示还未创建等待队列,走后续代码去创建,不为null就直接设置新节点的prev为尾节点,然后cas添加新节点,成功后设置尾节点的next为当新节点,然后返回新节点。
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

/**
* 将节点插入队列,必要时进行初始化
* @param node 要插入的节点
* @return 新节点的前一个节点
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize 初始化一个节点作为头结点,傀儡节点,作为哨兵节点,占位
if (compareAndSetHead(new Node()))
tail = head;
} else {// 带头节点的尾插法插入新的节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

非公平锁这里使用的是addWaiter(Node.EXCLUSIVE),可以看到是使用的独占方式的节点

acquireQueued()

java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/**
* 在等待队列中以独占不间断模式获取。由条件等待方法以及获取方法使用
Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {// 自旋,一直去尝试acquire
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {// 如果前一个节点是head后,再次尝试抢占锁,队列里的线程此时就是按顺序的,因为是非公平锁,此时也有可能有新的线程也tryAcquire,此时此unpark的节点与新线程竞争,成功后返回true,执行后续代码,失败又会被park,继续等待unpark
// 到这儿代表当前线程cas state为1,然后将前一节点设置为head
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// FailedAcquire后应该park,先去设置前一个Node的waitStatus为Node.SIGNAL,成功后再LockSupport.park
// 当调用了unpark()后parkAndCheckInterrupt()返回了false,又进入循环,当前node.predecessor()就是head了
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 如果此节点是Node.SIGNAL,即等待被占用的资源释放,可以安全的park,返回true执行后续的parkAndCheckInterrupt,
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* 大于0表示当前节点是cancelled,循环判断前面的节点是否也是cancelled,忽略该节点状态,重新连接队列
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 将当前节点设置为SIGNAL,返回false,又继续一次tryAcquire,还是失败后继续走此方法后判断waitStatus为SIGNAL
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

cancelAcquire()

java.util.concurrent.locks.AbstractQueuedSynchronizer#cancelAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;

node.thread = null;

// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;

// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;

// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}

node.next = node; // help GC
}
}

unlock()方法详解

1
2
3
public void unlock() {
sync.release(1);
}

实际调用sync的release()方法,1代表持有计数减1。

release()

java.util.concurrent.locks.AbstractQueuedSynchronizer#release

1
2
3
4
5
6
7
8
9
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease()

java.util.concurrent.locks.ReentrantLock.Sync#tryRelease

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {// 为0才表示当前线程的state数量全减完了,可重入锁的state就不为一,需要多次调用unlock()方法将state减至0
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor()

java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

总结

具体流程图

ReentrantReadWriteLock&&StampLock

ReentrantReadWriteLock

排他锁和共享锁

排它锁:又称独占锁,独享锁,synchronized,ReentrantLock都是排它锁

共享锁:又称为读锁,获得共享锁后,可以查看,但无法删除和修改数据, 其他线程此时也可以获取到共享锁,也可以查看但是无法修改和删除数据

  • 共享锁和排它锁典型是ReentranReadWriteLock
  • 写锁是排它锁
  • 读锁是共享锁

读写锁的锁降级

锁降级:将写入锁降级为读锁,锁的严苛程度变强叫做升级,反之叫做降级。

特性 说明
公平性选择 支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平
重进入 该锁支持重进人,以读写线程为例:读线程在获取了读锁之后,能够再次获取读锁。而写线程在获取了写锁之后能够再次获取写锁,同时也可以获取读锁
锁降级 遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁

写锁的降级,降级成为了读锁。即如果同一个线程持有了写锁,在没有释放写锁的情况下,它还可以继续获得读锁。这就是写锁的降级,降级成为了读锁。

降级的规则

  1. 按照先获取写锁,然后获取读锁,再释放写锁的次序。
  2. 如果释放了写锁,那么就完全转换为读锁。

在ReentrantReadWriteLock中,当读锁被使用,如果有线程尝试获取写锁,该写线程会被阻塞。需要释放所有读锁后才能获取写锁,即不可以锁升级。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}

try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}

代码中声明了volatile类型的cacheValid变量,保证可见性。

先获取读锁,如果cache不可用,则释放读锁,获取写锁,在更改数据之前,再检查一次cacheValid的值(防止别的写操作已经改变了缓存状态),然后修改数据,将cacheValid置为true,然后在释放写锁前获取读锁;此时,cache中数据可用,处理cache中数据,最后释放读锁。这个过程就是一个完整的锁降级的过程,目的是保证数据可见性,如果当前的线程C在修改完cache中的数据后,没有获取读锁而是直接释放了写锁,那么假设此时另一个线程T获取了写锁并修改了数据,那么C线程无法感知到数据已被修改,则数据出现错误。如果遵循锁降级的步骤,线程C在释放写锁之前获取读锁,那么线程T在获取写锁时将被阻塞,直到线程C完成数据处理过程,释放读锁。

如果违背锁降级的步骤?

当前的线程C在修改完cache中的数据后,没有获取读锁而是直接释放了写锁,那么假设此时另一个线程D获取了写锁并修改了数据(即代码中的data,此时业务逻辑需要直接使用data,线程D可能拿到写锁已经修改了data,此时线程C内的data就是脏数据)那么C线程无法感知到数据已被修改,则数据出现错误。

如果遵循锁降级的步骤?

线程C在释放写锁之前获取读锁,那么线程D在获取写锁的时候将会被阻塞,直到线程C完成后续数据处理过程释放读锁:

  • 写后立刻读:写后读的数据保证是这次更新的数据,该机制是专门为缓存设计的
  • 写后重入读:同一个线程自己持有写锁时再去拿读锁,其本质相当于重入

写锁和读锁是互斥的

写锁和读锁是互斥的(这里的互斥是指线程间的互斥,当前线程可以获取到写锁又获取到读锁,但是获取到了读锁不能继续获取写锁),这是因为读写锁要保持写操作的可见性。因为,如果允许读锁在被获取的情况下对写锁的获取,那么正在运行的其他读线程无法感知到当前写线程的操作。

如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即ReentrantReadWriteLock读的过程中不允许写,只有等待线程都释放了读锁,当前线程才能获取写锁,也就是写入必须等待,这是一种悲观的读锁

写锁饥饿问题

读读共享是优点,但是与此同时也造成了写操作的饥饿现象

读锁没有完成之前,写锁无法获得,因为人家还在读着那,你先别去写,省的数据乱,这是一种悲观锁的策略。倘若读的时候别的线程还能修改的话,那读的数据就是脏数据了

缓解写锁饥饿问题:使用“公平”策略可以一定程度上缓解这个问题,即new ReentrantReadWriteLock(true);但是”公平”策略是以牺牲系统吞吐量为代价的。

StampedLock是由锁饥饿问题引出的

ReentrantReadWriteLock的读锁被占用的时候,其他线程尝试获取写锁的时候会被阻塞。

  • 但是StampedLock采取乐观获取读锁,获取读锁之后其他线程再尝试获取写锁时不会被阻塞,这其实是对读锁的优化。使用乐观读锁模式可以提高吞吐量
  • 所以,在获取乐观读锁后,还需要对结果进行校验。

StampedLock原理

邮戳锁的基本特点:

  1. 所有获取锁的方法,都返回一个邮戳(Stamp), Stamp 为零表示获取失败,其余都表示成功;
  2. 所有释放锁的方法,都需要一个邮戳(Stamp),这个 Stamp 必须是和成功获取锁时得到的Stamp一致;
  3. StampedLock 是不可重入的,没有Re开头。危险(如果一个线程已经持有了写锁,再去获取写锁的话就会造成死锁)
  4. StampedLock 的悲观读锁和写锁都不支持条件变量(Condition),这个也需要注意。
  5. 使用 StampedLock 一定不要调用中断操作,即不要调用 interrupt() 方法

StampedLock有三种访问模式:

  1. Reading(读模式悲观):功能和 ReentrantReadWriteLock 的读锁类似
  2. Writing(写模式悲观):功能和 ReentrantReadWriteLock 的写锁类似
  3. Optimistic reading(乐观读模式):无锁机制,类似于数据库中的乐观锁,支持读写并发,很乐观认为读取时没人修改,假如被修改再实现升级为悲观读模式
    1. 乐观的读。仅当锁定当前末处于写入模式时,方法 tryOptimisticRead() 才返回非零戳记。如果自获得给定标记以来没有在写入模式下获取锁定,则方法 validate(long) 返回 true 。这种模式可以被认为是读锁的极弱版本,可以随时被作者破坏。对短的只读代码段使用乐观模式通常可以减少争用并提高吞吐量。但是,它的使用本质上是脆弱的。
    2. 乐观读取部分应该只读取宇段并将它们保存在局部变量中,以便以后在验证后使用。在乐观模式下读取的字段可能非常不一致,因此仅在您熟悉数据表示以检查一致性和/或重复调用方法 validate(),例如,在首次读取对象或数组引用,然后访问其中一个字段,元素或方法时,通常需要执行此类步骤。