共享模型之管程

目录:

1、共享问题

2、synchronized

3、线程安全分析

4、Monitor

5、wait/notify

6、线程状态转换

7、活跃性

8、Lock

4.1 线程出现问题的根本原因分析

线程出现问题的根本原因是因为线程上下文切换,导致线程里的指令没有执行完就切换执行其它线程了,下面举一个例子

  • 两个线程对初始值为0的静态变量一个做自增,一个做自减,各做5000次,结果是0吗?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static int count = 0;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(()->{
for (int i = 1;i<5000;i++){
count++;
}
});
Thread t2 =new Thread(()->{
for (int i = 1;i<5000;i++){
count--;
}
});
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("count的值是{}",count);
}

我将从字节码的层面进行分析:

以上的结果可能是正数、负数、零。为什么呢?因为 Java 中对静态变量的自增,自减并不是原子操作,要彻底理解,必须从字节码来进行分析

例如对于 i++ 而言(i 为静态变量),实际会产生如下的 JVM 字节码指令:

1
2
3
4
getstatic i // 获取静态变量i的值
iconst_1 // 准备常量1
iadd // 自增
putstatic i // 将修改后的值存入静态变量i

而对应 i-- 也是类似:

1
2
3
4
getstatic i // 获取静态变量i的值
iconst_1 // 准备常量1
isub // 自减
putstatic i // 将修改后的值存入静态变量i

可以看到count++count-- 操作实际都是需要这个4个指令完成的,那么这里问题就来了!Java 的内存模型如下,完成静态变量的自增,自减需要在主存和工作内存中进行数据交换:

如果代码是正常按顺序运行的,那么count的值不会计算错

出现负数的情况:

出现正数的情况:

问题的进一步描述

临界区 Critical Section

  1. 一个程序运行多线程本身是没有问题的

  2. 问题出现在多个线程访问共享资源

    1. 多个线程同时对共享资源进行读操作本身也没有问题
    2. 问题出现在对对共享资源同时进行读写操作时就有问题了
  3. 先定义一个叫做临界区的概念:一段代码内如果存在对共享资源的多线程读写操作,那么称这段代码为临界区,如

    1
    2
    3
    4
    5
    6
    7
    8
    9
    static int counter = 0;
    static void increment()
    {// 临界区
    counter++;
    }
    static void decrement()
    {// 临界区
    counter--;
    }

竞态条件 Race Condition

多个线程在临界区执行,那么由于代码指令的执行不确定而导致的结果问题,称为竞态条件

4.2 synchronized 解决方案

为了避免临界区中的竞态条件发生,由多种手段可以达到

  • 阻塞式解决方案:synchronized ,Lock
  • 非阻塞式解决方案:原子变量

现在讨论使用synchronized来进行解决,即俗称的对象锁,它采用互斥的方式让同一时刻至多只有一个线程持有对象锁,其他线程如果想获取这个锁就会阻塞住,这样就能保证拥有锁的线程可以安全的执行临界区内的代码,不用担心线程上下文切换

注意 虽然 java 中互斥和同步都可以采用 synchronized 关键字来完成,但它们还是有区别的:

  • 互斥是保证临界区的竞态条件发生,同一时刻只能有一个线程执行临界区的代码

  • 同步是由于线程执行的先后,顺序不同但是需要一个线程等待其它线程运行到某个点。

synchronized

1
2
3
4
synchronized(对象) // 线程1获得锁, 那么线程2的状态是(blocked)
{
临界区
}

上面的实例程序使用synchronized后如下,计算出的结果是正确!Test13.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static int counter = 0;
static final Object room = new Object();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
synchronized (room) {
counter++;
}
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 5000; i++) {
synchronized (room) {
counter--;
}
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
log.debug("{}",counter);
}

思考

synchronized实际是 用对象锁保证了临界区中代码的原子性,临界区内的代对码对外是不可分割的,不会被线程切换所打断。

为了加深理解思考下面的问题:

  • 如果吧synchronized(obj)放在for循环外面如何理解?

    放在外面相当于给整个循环加上了锁那么就是保护了循环次数i++/--的指令这期间不会被其他线程干扰

  • 如果t1 synchronized(obj1)但t2 synchronized(obj2) 会怎么样?

    不可以,t1,t2如果锁的不是一个对象,那么当发生上下文切换的时候,别的线程不会被阻塞

  • 如果t1synchronized(obj)t2没有会怎么样?如何理解

    t2不加锁的话他可以直接拿到共享变量,不会被阻塞住

synchronized原理

synchronized实际上利用对象保证了临界区代码的原子性,临界区内的代码在外界看来是不可分割的,不会被线程切换所打断

方法上的 synchronized

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
    class Test{
public synchronized void test() {

}
}
//等价于
class Test{
public void test() {
synchronized(this) {

}
}
}
//------------------------------------------------------------------------------------------------
class Test{
public synchronized static void test() {
}
}
// 等价于
class Test{
public static void test() {
synchronized(Test.class) {

}
}
}

所谓的线程八锁问题

其实就是考察 synchronized 锁住的是哪个对象

情况1:12 或 21

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Slf4j(topic = "c.Number")
class Number{
public synchronized void a() {
log.debug("1");
}
public synchronized void b() {
log.debug("2");
}
}
public static void main(String[] args) {
Number n1 = new Number();
new Thread(()->{ n1.a(); }).start();
new Thread(()->{ n1.b(); }).start();
}

情况2:1s后1 2,或 2 1s后 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j(topic = "c.Number")
class Number{
public synchronized void a() {
sleep(1);
log.debug("1");
}
public synchronized void b() {
log.debug("2");
}
}
public static void main(String[] args) {
Number n1 = new Number();
new Thread(()->{ n1.a(); }).start();
new Thread(()->{ n1.b(); }).start();
}

情况3:3 1s 12 或 23 1s 1 或 32 1s 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Slf4j(topic = "c.Number")
class Number{
public synchronized void a() {
sleep(1);
log.debug("1");
}
public synchronized void b() {
log.debug("2");
}
public void c() {
log.debug("3");
}
}
public static void main(String[] args) {
Number n1 = new Number();
new Thread(()->{ n1.a(); }).start();
new Thread(()->{ n1.b(); }).start();
new Thread(()->{ n1.c(); }).start();
}

情况4:2 1s 后 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Slf4j(topic = "c.Number")
class Number{
public synchronized void a() {
sleep(1);
log.debug("1");
}
public synchronized void b() {
log.debug("2");
}
}
public static void main(String[] args) {
Number n1 = new Number();
Number n2 = new Number();
new Thread(()->{ n1.a(); }).start();
new Thread(()->{ n2.b(); }).start();
}

情况5:2 1s 后 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j(topic = "c.Number")
class Number{
public static synchronized void a() {
sleep(1);
log.debug("1");
}
public synchronized void b() {
log.debug("2");
}
}
public static void main(String[] args) {
Number n1 = new Number();
new Thread(()->{ n1.a(); }).start();
new Thread(()->{ n1.b(); }).start();
}

情况6:1s 后12, 或 2 1s后 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j(topic = "c.Number")
class Number{
public static synchronized void a() {
sleep(1);
log.debug("1");
}
public static synchronized void b() {
log.debug("2");
}
}
public static void main(String[] args) {
Number n1 = new Number();
new Thread(()->{ n1.a(); }).start();
new Thread(()->{ n1.b(); }).start();
}

情况7:2 1s 后 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Slf4j(topic = "c.Number")
class Number{
public static synchronized void a() {
sleep(1);
log.debug("1");
}
public synchronized void b() {
log.debug("2");
}
}
public static void main(String[] args) {
Number n1 = new Number();
Number n2 = new Number();
new Thread(()->{ n1.a(); }).start();
new Thread(()->{ n2.b(); }).start();
}

情况8:1s 后12, 或 2 1s后 1

  • 因为方法a和b 都被 static 修饰了,锁的是类对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Slf4j(topic = "c.Number")
class Number{
public static synchronized void a() {
sleep(1);
log.debug("1");
}
public static synchronized void b() {
log.debug("2");
}
}
public static void main(String[] args) {
Number n1 = new Number();
Number n2 = new Number();
new Thread(()->{ n1.a(); }).start();
new Thread(()->{ n2.b(); }).start();
}

4.3 变量的线程安全分析

4.3.1 成员变量和静态变量的线程安全分析

  • 如果没有变量没有在线程间共享,那么变量是安全的
  • 如果变量在线程间共享
    • 如果只有读操作,则线程安全
    • 如果有读写操作,则这段代码是临界区,需要考虑线程安全

4.3.2 局部变量线程安全分析

  • 局部变量【局部变量被初始化为基本数据类型】是安全的
  • 局部变量引用的对象未必是安全的(逃逸分析
    • 如果局部变量引用的对象没有引用线程共享的对象,那么是线程安全的
    • 如果局部变量引用的对象引用了一个线程共享的对象,那么要考虑线程安全的

线程安全的情况

局部变量【局部变量被初始化为基本数据类型】是安全的,示例如下

1
2
3
4
public static void test1() {
int i = 10;
i++;
}

每个线程调用 test1() 方法时局部变量 i,会在每个线程的栈帧内存中被创建多份,因此不存在共享

线程不安全的情况

如果局部变量引用的对象逃离方法的范围,那么要考虑线程安全的,代码示例如下 Test15.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Test15 {
public static void main(String[] args) {
UnsafeTest unsafeTest = new UnsafeTest();
for (int i =0;i<100;i++){
new Thread(()->{
unsafeTest.method1();
},"线程"+i).start();
}
}
}
class UnsafeTest{
ArrayList<String> arrayList = new ArrayList<>();
public void method1(){
for (int i = 0; i < 100; i++) {
method2();
method3();
}
}
private void method2() {
arrayList.add("1");
}
private void method3() {
arrayList.remove(0);
}
}
不安全原因分析

无论哪个线程中的 method2 和method3 引用的都是同一个对象中的 list 成员变量:一个 ArrayList ,在添加一个元素的时候,它可能会有两步来完成:

  1. 第一步,在 arrayList[Size] 的位置存放此元素; 第二步增大 Size 的值。
  2. 在单线程运行的情况下,如果 Size = 0,添加一个元素后,此元素在位置 0,而且 Size=1;而如果是在多线程情下,比如有两个线程,线程 A 先将元素存放在位置 0。但是此时 CPU 调线程A暂停,线程 B 得到运行的机会。线程B也向此 ArrayList 添加元素,因为此时 Size 仍等于 0 (注意哦,我们假设的是添加一个元素是要两个步骤哦,而线程A仅仅完成了步骤1),所以线程B也将元素存放在位置0。然后线程A和线程B都继续运行,都增加 Size 的值。 那好,现在我们来看看 ArrayList 的情况,元素实际上只有一个,存放在位置 0,而 Size 却等于 2。这就是“线程不 安全”了。

解决方法

可以将list修改成局部变量,那么就不会有上述问题了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class safeTest{
public void method1(){
ArrayList<String> arrayList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
method2(arrayList);
method3(arrayList);}
}
private void method2(ArrayList arrayList) {
arrayList.add("1");
}
private void method3(ArrayList arrayList) {
arrayList.remove(0);
}
}
思考 private 或 final的重要性

方法访问修饰符带来的思考,如果把 method2 和 method3 的方法修改为 public 会不会导致线程安全问题?

情况1:有其它线程调用 method2 和 method3;

情况2:在情况1 的基础上,为 ThreadSafe 类添加子类,子类覆盖 method2 或 method3 方法,即

  • 从这个例子可以看出 private 或 final 提供【安全】的意义所在,请体会开闭原则中的【闭】
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class ThreadSafe {
public final void method1(int loopNumber) {
ArrayList<String> list = new ArrayList<>();
for (int i = 0; i < loopNumber; i++) {
method2(list);
method3(list);
}
}
private void method2(ArrayList<String> list) {
list.add("1");
}
private void method3(ArrayList<String> list) {
list.remove(0);
}
}
class ThreadSafeSubClass extends ThreadSafe{
@Override
public void method3(ArrayList<String> list) {
new Thread(() -> {
list.remove(0);
}).start();
}
}

4.3.3 常见线程安全类

  1. String
  2. Integer
  3. StringBuffer
  4. Random
  5. Vector
  6. Hashtable
  7. java.util.concurrent 包下的类

这里说它们是线程安全的是指,多个线程调用它们同一个实例的某个方法时,是线程安全的。也可以理解为它们的每个方法是原子的

1
2
3
4
5
6
7
8
Hashtable table = new Hashtable();
new Thread(()->{
table.put("key", "value1");
}).start();
new Thread(()->{
table.put("key", "value2");
}).start();

线程安全类方法的组合

但注意它们多个方法的组合不是原子的,见下面分析

1
2
3
4
5
Hashtable table = new Hashtable();
// 线程1,线程2
if( table.get("key") == null) {
table.put("key", value);
}

不可变类的线程安全

StringInteger类都是不可变的类,因为其类内部状态是不可改变的,因此它们的方法都是线程安全的

有同学或许有疑问,Stringreplacesubstring 等方法【可以】改变值啊,其实调用这些方法返回的已经是一个新创建的对象了!

1
2
3
4
5
6
7
8
9
10
11
12
public class Immutable{
private int value = 0;
public Immutable(int value){
this.value = value;
}
public int getValue(){
return this.value;
}
public Immutable add(int v){
return new Immutable(this.value + v);
}
}

示例分析-是否线程安全

例1:MyServlet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class MyServlet extends HttpServlet {
// 是否安全? HashMap不安全,HashTable线程安全
Map<String,Object> map = new HashMap<>();

// 是否安全? 字符串不可变类,线程安全
String S1 = "...";

// 是否安全? 不可变类,线程安全
final String S2 = "...";

// 是否安全? 线程不安全
Date D1 = new Date();

// 是否安全? 线程安全(final)
final Date D2 = new Date();

public void doGet(HttpServletRequest request, HttpServletResponse response) {
// 使用上述变量
}
}

例2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class MyServlet extends HttpServlet {
// 是否安全? 线程不安全(里面有count成员,这是个有状态的对象)
private UserService userService = new UserServiceImpl();

public void doGet(HttpServletRequest request, HttpServletResponse response) {
userService.update(...);
}
}
public class UserServiceImpl implements UserService {
// 记录调用次数
private int count = 0;

public void update() {
// ...
count++;
}
}

例3:切面 MyAspect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Aspect
@Component
public class MyAspect {
// 是否安全? 线程安全(因为默认单例,看scope)
private long start = 0L;

@Before("execution(* *(..))")
public void before() {
start = System.nanoTime();
}

@After("execution(* *(..))")
public void after() {
long end = System.nanoTime();
System.out.println("cost time:" + (end-start));
}
}

例4:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class MyServlet extends HttpServlet {
// 是否安全 线程安全(私有的,不可变的)
private UserService userService = new UserServiceImpl();

public void doGet(HttpServletRequest request, HttpServletResponse response) {
userService.update(...);
}
}
public class UserServiceImpl implements UserService {
// 是否安全 线程安全(因为这个Dao没有成员变量)
private UserDao userDao = new UserDaoImpl();

public void update() {
userDao.update();
}

// 没有成员变量的 类 都是线程安全的
public class UserDaoImpl implements UserDao {
public void update() {
String sql = "update user set password = ? where username = ?";
// 是否安全 线程安全(因为connection都是方法区中的局部变量)
try (Connection conn = DriverManager.getConnection("","","")){
// ...
} catch (Exception e) {
// ...
}
}
}

例5:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class MyServlet extends HttpServlet {
// 是否安全
private UserService userService = new UserServiceImpl();

public void doGet(HttpServletRequest request, HttpServletResponse response) {
userService.update(...);
}
}
public class UserServiceImpl implements UserService {
// 是否安全
private UserDao userDao = new UserDaoImpl();

public void update() {
userDao.update();
}
}
public class UserDaoImpl implements UserDao {
// 是否安全 线程不安全(将Connection做成全局变量,当a执行完后close,b可能还在执行)
private Connection conn = null;
public void update() throws SQLException {
String sql = "update user set password = ? where username = ?";
conn = DriverManager.getConnection("","","");
// ...
conn.close();
}
}

例6:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MyServlet extends HttpServlet {
// 是否安全
private UserService userService = new UserServiceImpl();

public void doGet(HttpServletRequest request, HttpServletResponse response) {
userService.update(...);
}
}
public class UserServiceImpl implements UserService {
public void update() {
UserDao userDao = new UserDaoImpl();
userDao.update();
}
}
public class UserDaoImpl implements UserDao {
// 是否安全 线程安全(因为Serviceimpl中将userdao放在方法内,无线程安全问题但是不推荐这样写)
private Connection = null;
public void update() throws SQLException {
String sql = "update user set password = ? where username = ?";
conn = DriverManager.getConnection("","","");
// ...
conn.close();
}
}

例7:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public abstract class Test {

public void bar() {
// 是否安全 线程不安全(调用foo,子类可能重写abstract方法对sdf进行修改)
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
foo(sdf);
}

public abstract foo(SimpleDateFormat sdf);


public static void main(String[] args) {
new Test().bar();
}
}

其中 foo 的行为是不确定的,可能导致不安全的发生,被称之为外星方法

1
2
3
4
5
6
7
8
9
10
11
12
public void foo(SimpleDateFormat sdf) {
String dateStr = "1999-10-11 00:00:00";
for (int i = 0; i < 20; i++) {
new Thread(() -> {
try {
sdf.parse(dateStr);
} catch (ParseException e) {
e.printStackTrace();
}
}).start();
}
}

请比较 JDK 中 String 类的实现

例8:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static Integer i = 0;
public static void main(String[] args) throws InterruptedException {
List<Thread> list = new ArrayList<>();
for (int j = 0; j < 2; j++) {
Thread thread = new Thread(() -> {
for (int k = 0; k < 5000; k++) {
synchronized (i) {
i++;
}
}
}, "" + j);
list.add(thread);
}
list.stream().forEach(t -> t.start());
list.stream().forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
log.debug("{}", i);
}

4.4 习题分析

卖票练习

测试下面代码是否存在线程安全问题,并尝试改正

  • 将sell方法声明为synchronized即可
  • 注意只将对count进行修改的一行代码用synchronized括起来也不行。对count大小的判断也必须是为原子操作的一部分,否则也会导致count值异常。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class ExerciseSell {
public static void main(String[] args) {
TicketWindow ticketWindow = new TicketWindow(2000);
List<Thread> list = new ArrayList<>();
// 用来存储买出去多少张票
List<Integer> sellCount = new Vector<>();
for (int i = 0; i < 2000; i++) {
Thread t = new Thread(() -> {
// 分析这里的竞态条件
int count = ticketWindow.sell(randomAmount());
sellCount.add(count);
});
list.add(t);
t.start();
}
list.forEach((t) -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 买出去的票求和
log.debug("selled count:{}",sellCount.stream().mapToInt(c -> c).sum());
// 剩余票数
log.debug("remainder count:{}", ticketWindow.getCount());
}
// Random 为线程安全
static Random random = new Random();
// 随机 1~5
public static int randomAmount() {
return random.nextInt(5) + 1;
}
}
class TicketWindow {
private int count;
public TicketWindow(int count) {
this.count = count;
}
public int getCount() {
return count;
}
//在方法上加一个synchronized即可
public int sell(int amount) {
if (this.count >= amount) {
this.count -= amount;
return amount;
} else {
return 0;
}
}
}

另外,用下面的代码行不行,为什么?

  • 不行,因为sellCount会被多个线程共享,必须使用线程安全的实现类。
1
List<Integer> sellCount = new ArrayList<>();

测试脚本

1
for /L %n in (1,1,10) do java -cp ".;C:\Users\manyh\.m2\repository\ch\qos\logback\logbackclassic\1.2.3\logback-classic-1.2.3.jar;C:\Users\manyh\.m2\repository\ch\qos\logback\logbackcore\1.2.3\logback-core-1.2.3.jar;C:\Users\manyh\.m2\repository\org\slf4j\slf4japi\1.7.25\slf4j-api-1.7.25.jar" cn.itcast.n4.exercise.ExerciseSell

说明:

  • 两段没有前后因果关系的临界区代码,只需要保证各自的原子性即可,不需要括起来。

转账练习

测试下面代码是否存在线程安全问题,并尝试改正

  • 将transfer方法的方法体用同步代码块包裹,将当Account.class设为锁对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class ExerciseTransfer {
public static void main(String[] args) throws InterruptedException {
Account a = new Account(1000);
Account b = new Account(1000);
Thread t1 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
a.transfer(b, randomAmount());
}
}, "t1");
Thread t2 = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
b.transfer(a, randomAmount());
}
}, "t2");
t1.start();
t2.start();
t1.join();
t2.join();
// 查看转账2000次后的总金额
log.debug("total:{}",(a.getMoney() + b.getMoney()));
}
// Random 为线程安全
static Random random = new Random();
// 随机 1~100
public static int randomAmount() {
return random.nextInt(100) +1;
}
}
class Account {
private int money;
public Account(int money) {
this.money = money;
}
public int getMoney() {
return money;
}
public void setMoney(int money) {
this.money = money;
}
public void transfer(Account target, int amount) {
if (this.money > amount) {
this.setMoney(this.getMoney() - amount);
target.setMoney(target.getMoney() + amount);
}
}
/**
简单的解决方法 对Account.class加锁
public void transfer(Account target, int amount) {
synchronized(Account.class){
if (this.money > amount) {
this.setMoney(this.getMoney() - amount);
target.setMoney(target.getMoney() + amount);
}
}
}
*/
}

这样改正行不行,为什么?

  • 不行,因为不同线程调用此方法,将会锁住不同的对象
1
2
3
4
5
6
public synchronized void transfer(Account target, int amount) {
if (this.money > amount) {
this.setMoney(this.getMoney() - amount);
target.setMoney(target.getMoney() + amount);
}
}

4.5 Monitor 概念

Java 对象头

以 32 位虚拟机为例,普通对象的对象头结构如下,其中的Klass Word为指针,指向对应的Class对象;

数组对象

其中 Mark Word 结构为

所以一个对象的结构如下:

✨✨Monitor 原理(锁)

Monitor被翻译为监视器或者说管程

每个java对象都可以关联一个Monitor,如果使用synchronized给对象上锁(重量级),该对象头的Mark Word中就被设置为指向Monitor对象的指针

  • 刚开始时Monitor中的Owner为null
  • 当Thread-2 执行synchronized(obj){}代码时就会将Monitor的所有者Owner 设置为 Thread-2,上锁成功,Monitor中同一时刻只能有一个Owner
  • 当Thread-2 占据锁时,如果线程Thread-3,Thread-4也来执行synchronized(obj){}代码,就会进入EntryList中变成BLOCKED状态
  • Thread-2 执行完同步代码块的内容,然后唤醒 EntryList 中等待的线程来竞争锁,竞争时是非公平的
  • 图中 WaitSet 中的 Thread-0,Thread-1 是之前获得过锁,但条件不满足进入 WAITING 状态的线程,后面讲wait-notify 时会分析

注意:

synchronized 必须是进入同一个对象的 monitor 才有上述的效果

不加 synchronized 的对象不会关联监视器,不遵从以上规则

synchronized原理

代码如下 Test17.java

1
2
3
4
5
6
7
static final Object lock = new Object();
static int counter = 0;
public static void main(String[] args) {
synchronized (lock) {
counter++;
}
}

反编译后的部分字节码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=2, locals=3, args_size=1
0: getstatic #2 // <- lock引用 (synchronized开始)
3: dup
4: astore_1 // lock引用 -> slot 1
5: monitorenter // 将 lock对象 MarkWord 置为 Monitor 指针
6: getstatic #3 // <- i
9: iconst_1 // 准备常数 1
10: iadd // +1
11: putstatic #3 // -> i
14: aload_1 // <- lock引用
15: monitorexit // 将 lock对象 MarkWord 重置, 唤醒 EntryList
16: goto 24
19: astore_2 // e -> slot 2
20: aload_1 // <- lock引用
21: monitorexit // 将 lock对象 MarkWord 重置, 唤醒 EntryList
22: aload_2 // <- slot 2 (e)
23: athrow // throw e
24: return
Exception table:
from to target type
6 16 19 any
19 22 19 any
LineNumberTable:
line 8: 0
line 9: 6
line 10: 14
line 11: 24
LocalVariableTable:
Start Length Slot Name Signature
0 25 0 args [Ljava/lang/String;
StackMapTable: number_of_entries = 2
frame_type = 255 /* full_frame */
offset_delta = 19
locals = [ class "[Ljava/lang/String;", class java/lang/Object ]
stack = [ class java/lang/Throwable ]
frame_type = 250 /* chop */
offset_delta = 4

注意:方法级别的 synchronized 不会在字节码指令中有所体现

synchronized 原理进阶

轻量级锁

轻量级锁的使用场景是:如果一个对象虽然有多个线程要对它进行加锁,但是加锁的时间是错开的(也就是没有人可以竞争的,出现竞争会使用CAS+自旋的方式尝试),那么可以使用轻量级锁来进行优化。

  • 轻量级锁对使用者是透明的,即语法仍然是synchronized

假设有两个方法同步块,利用同一个对象加锁

1
2
3
4
5
6
7
8
9
10
11
12
static final Object obj = new Object();
public static void method1() {
synchronized( obj ) {
// 同步块 A
method2();
}
}
public static void method2() {
synchronized( obj ) {
// 同步块 B
}
}
  1. 每次指向到synchronized代码块时,都会创建锁记录(Lock Record)对象,每个线程都会包括一个锁记录的结构,锁记录内部可以储存对象的Mark Word和对象引用reference

  2. 让锁记录中的Object reference指向对象,并且尝试用cas(compare and sweep)替换Object对象的Mark Word ,将Mark Word 的值存入锁记录中

  3. 如果cas替换成功,那么对象的对象头储存的就是锁记录的地址和状态01,如下所示

  4. 如果cas失败,有两种情况

    1. 如果是其它线程已经持有了该Object的轻量级锁,那么表示有竞争,将进入锁膨胀阶段

    2. 如果是自己的线程已经执行了synchronized锁重入,那么那么再添加一条 Lock Record 作为重入的计数

  5. 当线程退出synchronized代码块的时候,如果获取的是取值为 null 的锁记录 ,表示有重入,这时重置锁记录,表示重入计数减一

    解锁

    1. 当线程退出synchronized代码块的时候,如果获取的锁记录取值不为 null,那么使用cas将Mark Word的值恢复给对象

    2. 成功则解锁成功

    3. 失败,则说明轻量级锁进行了锁膨胀或已经升级为重量级锁,进入重量级锁解锁流程

锁膨胀

如果在尝试加轻量级锁的过程中,cas操作无法成功,这是有一种情况就是其它线程已经为这个对象加上了轻量级锁,这是就要进行锁膨胀,将轻量级锁变成重量级锁。

  1. 当 Thread-1 进行轻量级加锁时,Thread-0 已经对该对象加了轻量级锁

  2. 这时 Thread-1 加轻量级锁失败,进入锁膨胀流程

    • 即为对象申请Monitor锁,让Object指向重量级锁地址,然后自己进入Monitor 的EntryList 变成BLOCKED状态**

  3. 当Thread-0 推出synchronized同步块时,使用cas将Mark Word的值恢复给对象头,失败,那么会进入重量级锁的解锁过程,即按照Monitor的地址找到Monitor对象,将 Owner 设置为null,唤醒 EntryList 中的Thread-1线程

自旋优化

重量级锁竞争的时候,还可以使用自旋来进行优化,如果当前线程自旋成功(即在自旋的时候持锁的线程释放了锁),那么当前线程就可以不用进行上下文切换就获得了锁

  1. 自旋重试成功的情况

  2. 自旋重试失败的情况,自旋了一定次数还是没有等到持锁的线程释放锁

自旋会占用 CPU 时间,单核 CPU 自旋就是浪费,多核 CPU 自旋才能发挥优势。

在 Java 6 之后自旋锁是自适应的,比如对象刚刚的一次自旋操作成功过,那么认为这次自旋成功的可能性会高,就多自旋几次;反之,就少自旋甚至不自旋,总之,比较智能。

Java 7 之后不能控制是否开启自旋功能

偏向锁

轻量级锁在没有竞争时(就自己这个线程),每次重入仍然需要执行 CAS 操作。

Java 6 中引入了偏向锁来做进一步优化:只有第一次使用 CAS 将线程 ID 设置到对象的 Mark Word 头,之后发现 这个线程 ID 是自己的就表示没有竞争,不用重新 CAS。以后只要不发生竞争,这个对象就归该线程所有

例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final Object obj = new Object();
public static void m1() {
synchronized( obj ) {
// 同步块 A
m2();
}
}
public static void m2() {
synchronized( obj ) {
// 同步块 B
m3();
}
}
public static void m3() {
synchronized( obj ) {
// 同步块 C
}
}

偏向状态

一个对象的创建过程

  1. 如果开启了偏向锁(默认是开启的),那么对象刚创建之后,Mark Word 最后三位的值101,并且这是它的Thread,epoch,age都是0,在加锁的时候进行设置这些的值

  2. 偏向锁默认是延迟的,不会在程序启动的时候立刻生效,如果想避免延迟,可以添加虚拟机参数来禁用延迟:

    • -XX:BiasedLockingStartupDelay=0来禁用延迟
  3. 注意:处于偏向锁的对象解锁后,线程 id 仍存储于对象头中

  4. 实验Test18.java,加上虚拟机参数-XX:BiasedLockingStartupDelay=0进行测试

    1
    2
    3
    4
    5
    6
    7
    8
    public static void main(String[] args) throws InterruptedException {
    Test1 t = new Test1();
    test.parseObjectHeader(getObjectHeader(t));
    synchronized (t){
    test.parseObjectHeader(getObjectHeader(t));
    }
    test.parseObjectHeader(getObjectHeader(t));
    }

    输出结果如下,三次输出的状态码都为101

    1
    2
    3
    4
    5
    6
    biasedLockFlag (1bit): 1
    LockFlag (2bit): 01
    biasedLockFlag (1bit): 1
    LockFlag (2bit): 01
    biasedLockFlag (1bit): 1
    LockFlag (2bit): 01

测试禁用:如果没有开启偏向锁,那么对象创建后最后三位的值为001,这时候它的hashcode,age都为0,hashcode是第一次用到hashcode时才赋值的。在上面测试代码运行时在添加 VM 参数-XX:-UseBiasedLocking禁用偏向锁(禁用偏向锁则优先使用轻量级锁),退出synchronized状态变回001

  1. 测试代码Test18.java 虚拟机参数-XX:-UseBiasedLocking

  2. 输出结果如下,最开始状态为001,然后加轻量级锁变成00,最后恢复成001

    1
    2
    3
    4
    5
    biasedLockFlag (1bit): 0
    LockFlag (2bit): 01
    LockFlag (2bit): 00
    biasedLockFlag (1bit): 0
    LockFlag (2bit): 01
撤销偏向锁-hashcode方法

测试 hashCode当调用对象的hashcode方法的时候就会撤销这个对象的偏向锁,因为使用偏向锁时没有位置存hashcode的值了

调用了对象的 hashCode,但偏向锁的对象 MarkWord 中存储的是线程 id,如果调用 hashCode 会导致偏向锁被 撤销

  • 轻量级锁会在锁记录中记录 hashCode
  • 重量级锁会在 Monitor 中记录 hashCode
  1. 测试代码如下,使用虚拟机参数-XX:BiasedLockingStartupDelay=0 ,确保我们的程序最开始使用了偏向锁!但是结果显示程序还是使用了轻量级锁。 Test20.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public static void main(String[] args) throws InterruptedException {
    Test1 t = new Test1();
    t.hashCode();
    test.parseObjectHeader(getObjectHeader(t));

    synchronized (t){
    test.parseObjectHeader(getObjectHeader(t));
    }
    test.parseObjectHeader(getObjectHeader(t));
    }
  2. 输出结果

    1
    2
    3
    4
    5
    biasedLockFlag (1bit): 0
    LockFlag (2bit): 01
    LockFlag (2bit): 00
    biasedLockFlag (1bit): 0
    LockFlag (2bit): 01
撤销偏向锁-其它线程使用对象

当有其它线程使用偏向锁对象时,会将偏向锁升级为轻量级锁

这里我们演示的是偏向锁撤销变成轻量级锁的过程,那么就得满足轻量级锁的使用条件,就是没有线程对同一个对象进行锁竞争,我们使用waitnotify 来辅助实现

  1. 代码 Test19.java,虚拟机参数-XX:BiasedLockingStartupDelay=0确保我们的程序最开始使用了偏向锁!

    • 输出结果,最开始使用的是偏向锁,但是第二个线程尝试获取对象锁时,发现本来对象偏向的是线程一,那么偏向锁就会失效,加的就是轻量级锁
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    biasedLockFlag (1bit): 1
    LockFlag (2bit): 01
    biasedLockFlag (1bit): 1
    LockFlag (2bit): 01
    biasedLockFlag (1bit): 1
    LockFlag (2bit): 01
    biasedLockFlag (1bit): 1
    LockFlag (2bit): 01
    LockFlag (2bit): 00
    biasedLockFlag (1bit): 0
    LockFlag (2bit): 01
撤销 - 调用 wait/notify

会使对象的锁变成重量级锁,因为wait/notify方法之后重量级锁才支持

批量重偏向

当撤销偏向锁阈值超过 40 次后,jvm 会这样觉得,自己确实偏向错了,根本就不该偏向。于是整个类的所有对象 都会变为不可偏向的,新建的对象也是不可偏向的

锁消除

每个线程都有自己的栈帧,经过逃逸分析判断不存在线程安全问题时,会进行锁消除

锁粗化

对相同对象多次加锁,导致线程发生多次重入,可以使用锁粗化方式来优化,这不同于之前讲的细分锁的粒度。

4.6 wait和notify

  • Owner 线程发现条件不满足,调用 wait 方法,即可进入 WaitSet 变为 WAITING 状态
  • BLOCKED 和 WAITING 的线程都处于阻塞状态,不占用 CPU 时间片
  • BLOCKED 线程会在 Owner 线程释放锁时唤醒
  • WAITING 线程会在 Owner 线程调用 notify 或 notifyAll 时唤醒,但唤醒后并不意味者立刻获得锁,仍需进入 EntryList 重新竞争

API 介绍

  • obj.wait() 让进入 object 监视器的线程到 waitSet 等待
  • obj.wait(long n) 有时限的等待,到 n 毫秒后结束等待,或者被notify
  • obj.notify() 在 object 上正在 waitSet 等待的线程中挑一个唤醒
  • obj.notifyAll() 让 object 上正在 waitSet 等待的线程全部唤醒

它们都是线程之间进行协作的手段,都属于Object方法,必须获得此对象的锁,才能调用这几个方法

sleep(long n)wait(long n) 的区别

  1. sleep 是 Thread 方法,而 wait 是 Object 的方法
  2. sleep 不需要强制和 synchronized 配合使用,但 wait 需要 和 synchronized 一起用
  3. sleep 在睡眠的同时,不会释放对象锁的,但 wait 在等待的时候会释放对象锁

4.6.1 同步模式之保护性暂停

即 Guarded Suspension,用在一个线程等待另一个线程的执行结果,要点:

  1. 有一个结果需要从一个线程传递到另一个线程,让他们关联同一个 GuardedObject
  2. 如果有结果不断从一个线程到另一个线程那么可以使用消息队列(见生产者/消费者)
  3. JDK 中,join 的实现、Future 的实现,采用的就是此模式
  4. 因为要等待另一方的结果,因此归类到同步模式

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class GuardedObject {
private Object response;
private final Object lock = new Object();
//获取结果
public Object get() {
synchronized (lock) {
// 条件不满足则等待
while (response == null) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return response;
}
}
//产生结果
public void complete(Object response) {
synchronized (lock) {
// 条件满足,通知等待线程
this.response = response;
lock.notifyAll();
}
}
}

测试

一个线程等待另一个线程的执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public void run() {
GuardedObject guardedObject = MailBoxes.getGuardedObject(id);
guardedObject.set(mail);
log.debug("送信成功,id={},内容:{}",id,mail);
}
public static void main(String[] args) {
GuardedObject guardedObject = new GuardedObject();
new Thread(() -> {
try {
// 子线程执行下载
List<String> response = download();
log.debug("download complete...");
guardedObject.complete(response);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
log.debug("waiting...");
// 主线程阻塞等待
Object response = guardedObject.get();
log.debug("get response: [{}] lines", ((List<String>) response).size());
}

执行结果

1
2
3
08:42:18.568 [main] c.TestGuardedObject - waiting...
08:42:23.312 [Thread-0] c.TestGuardedObject - download complete...
08:42:23.312 [main] c.TestGuardedObject - get response: [3] lines

如果要控制超时时间呢?

  • 添加一个参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class GuardedObjectV2 {
private Object response;
private final Object lock = new Object();
public Object get(long millis) {
synchronized (lock) {
// 1) 记录最初时间
long begin = System.currentTimeMillis();
// 2) 已经经历的时间
long timePassed = 0;
while (response == null) {
// 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
// 这一轮循环应该等待的时间
long waitTime = millis - timePassed;
log.debug("waitTime: {}", waitTime);
if (waitTime <= 0) {
log.debug("break...");
break;
}
try {
lock.wait(waitTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 3) 如果提前被唤醒,这时已经经历的时间假设为 400
timePassed = System.currentTimeMillis() - begin;
log.debug("timePassed: {}, object is null {}",
timePassed, response == null);
}
return response;
}
}
public void complete(Object response) {
synchronized (lock) {
// 条件满足,通知等待线程
this.response = response;
log.debug("notify...");
lock.notifyAll();
}
}
}

虚假唤醒

虚假唤醒是一种现象,它只会出现在多线程环境中,指的是在多线程环境下,多个线程等待在同一个条件上,等到条件满足时,所有等待的线程都被唤醒,但由于多个线程执行的顺序不同,后面竞争到锁的线程在获得时间片时条件已经不再满足,线程应该继续睡眠但是却继续往下运行的一种现象。

​ 上面是比较书面化的定义,我们用人能听懂的话来介绍一下虚假唤醒。

​ 多线程环境的编程中,我们经常遇到让多个线程等待在一个条件上,等到这个条件成立的时候我们再去唤醒这些线程,让它们接着往下执行代码的场景。假如某一时刻条件成立,所有的线程都被唤醒了,然后去竞争锁,因为同一时刻只会有一个线程能拿到锁,其他的线程都会阻塞到锁上无法往下执行,等到成功争抢到锁的线程消费完条件,释放了锁,后面的线程继续运行,拿到锁时这个条件很可能已经不满足了,这个时候线程应该继续在这个条件上阻塞下去,而不应该继续执行,如果继续执行了,就说发生了虚假唤醒。

具体:https://blog.csdn.net/horacen/article/details/124233675


多任务版 GuardedObject

图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员

如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类,这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理。和生产者消费者模式的区别就是:这个生产者和消费者之间是一一对应的关系,但是生产者消费者模式并不是。rpc框架的调用中就使用到了这种模式。 Test24.java

4.6.2 异步模式之生产者/消费者

要点

  1. 与前面的保护性暂停中的 GuardObject 不同,不需要产生结果和消费结果的线程一一对应
  2. 消费队列可以用来平衡生产和消费的线程资源
  3. 生产者仅负责产生结果数据,不关心数据该如何处理,而消费者专心处理结果数据
  4. 消息队列是有容量限制的,满时不会再加入数据,空时不会再消耗数据
  5. JDK 中各种阻塞队列,采用的就是这种模式

“异步”的意思就是生产者产生消息之后消息没有被立刻消费,而“同步模式”中,消息在产生之后被立刻消费了。

我们写一个线程间通信的消息队列,要注意区别,像rabbit mq等消息框架是进程间通信的。

实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class Message {
private int id;
private Object message;
public Message(int id, Object message) {
this.id = id;
this.message = message;
}
public int getId() {
return id;
}
public Object getMessage() {
return message;
}
}
class MessageQueue {
private LinkedList<Message> queue;
private int capacity;
public MessageQueue(int capacity) {
this.capacity = capacity;
queue = new LinkedList<>();
}
public Message take() {
synchronized (queue) {
while (queue.isEmpty()) {
log.debug("没货了, wait");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Message message = queue.removeFirst();
queue.notifyAll();
return message;
}
}
public void put(Message message) {
synchronized (queue) {
while (queue.size() == capacity) {
log.debug("库存已达上限, wait");
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(message);
queue.notifyAll();
}
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
MessageQueue messageQueue = new MessageQueue(2);
// 4 个生产者线程, 下载任务
for (int i = 0; i < 4; i++) {
int id = i;
new Thread(() -> {
try {
log.debug("download...");
List<String> response = Downloader.download();
log.debug("try put message({})", id);
messageQueue.put(new Message(id, response));
} catch (IOException e) {
e.printStackTrace();
}
}, "生产者" + i).start();
}
// 1 个消费者线程, 处理结果
new Thread(() -> {
while (true) {
Message message = messageQueue.take();
List<String> response = (List<String>) message.getMessage();
log.debug("take message({}): [{}] lines", message.getId(), response.size());
}
}, "消费者").start();

某次运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
10:48:38.070 [生产者3] c.TestProducerConsumer - download...
10:48:38.070 [生产者0] c.TestProducerConsumer - download...
10:48:38.070 [消费者] c.MessageQueue - 没货了, wait
10:48:38.070 [生产者1] c.TestProducerConsumer - download...
10:48:38.070 [生产者2] c.TestProducerConsumer - download...
10:48:41.236 [生产者1] c.TestProducerConsumer - try put message(1)
10:48:41.237 [生产者2] c.TestProducerConsumer - try put message(2)
10:48:41.236 [生产者0] c.TestProducerConsumer - try put message(0)
10:48:41.237 [生产者3] c.TestProducerConsumer - try put message(3)
10:48:41.239 [生产者2] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [生产者1] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [消费者] c.TestProducerConsumer - take message(0): [3] lines
10:48:41.240 [生产者2] c.MessageQueue - 库存已达上限, wait
10:48:41.240 [消费者] c.TestProducerConsumer - take message(3): [3] lines
10:48:41.240 [消费者] c.TestProducerConsumer - take message(1): [3] lines
10:48:41.240 [消费者] c.TestProducerConsumer - take message(2): [3] lines
10:48:41.240 [消费者] c.MessageQueue - 没货了, wait

4.7 park & unpark

4.7.1 基本使用

它们是 LockSupport 类中的方法

1
2
3
4
// 暂停当前线程
LockSupport.park();
// 恢复某个线程的运行
LockSupport.unpark;

先 park 再 unpark

1
2
3
4
5
6
7
8
9
10
11
Thread t1 = new Thread(() -> {
log.debug("start...");
sleep(1);
log.debug("park...");
LockSupport.park();
log.debug("resume...");
},"t1");
t1.start();
sleep(2);
log.debug("unpark...");
LockSupport.unpark(t1);

输出

1
2
3
4
18:42:52.585 c.TestParkUnpark [t1] - start... 
18:42:53.589 c.TestParkUnpark [t1] - park...
18:42:54.583 c.TestParkUnpark [main] - unpark...
18:42:54.583 c.TestParkUnpark [t1] - resume...

先 unpark 再 park

1
2
3
4
5
6
7
8
9
10
11
Thread t1 = new Thread(() -> {
log.debug("start...");
sleep(2);
log.debug("park...");
LockSupport.park();
log.debug("resume...");
}, "t1");
t1.start();
sleep(1);
log.debug("unpark...");
LockSupport.unpark(t1);

输出

1
2
3
4
18:43:50.765 c.TestParkUnpark [t1] - start... 
18:43:51.764 c.TestParkUnpark [main] - unpark...
18:43:52.769 c.TestParkUnpark [t1] - park...
18:43:52.769 c.TestParkUnpark [t1] - resume...

特点

与 Object 的 wait & notify 相比

  • wait,notify 和 notifyAll 必须配合 Object Monitor 一起使用,而 park,unpark 不必
  • park & unpark 是以线程为单位来【阻塞】和【唤醒】线程,而 notify 只能随机唤醒一个等待线程,notifyAll 是唤醒所有等待线程,就不那么【精确】
  • park & unpark 可以先 unpark,而 wait & notify 不能先 notify

4.7.2 park unpark 原理

**每个线程都有自己的一个 Parker 对象,由三部分组成 _counter, _cond和 _mutex**

  1. 打个比喻线程就像一个旅人,Parker 就像他随身携带的背包,条件变量 _ cond就好比背包中的帐篷。_counter 就好比背包中的备用干粮(0 为耗尽,1 为充足)
  2. 调用 park 就是要看需不需要停下来歇息
    1. 如果备用干粮耗尽,那么钻进帐篷歇息
    2. 如果备用干粮充足,那么不需停留,继续前进
  3. 调用 unpark,就好比令干粮充足
    1. 如果这时线程还在帐篷,就唤醒让他继续前进
    2. 如果这时线程还在运行,那么下次他调用 park 时,仅是消耗掉备用干粮,不需停留继续前进
      1. 因为背包空间有限,多次调用 unpark 仅会补充一份备用干粮

可以不看例子,直接看实现过程

先调用park再调用upark的过程

1.先调用park

  1. 当前线程调用 Unsafe.park() 方法
  2. 检查 _counter ,本情况为 0,这时,获得 _mutex 互斥锁(mutex对象有个等待队列 _cond)
  3. 线程进入 _cond 条件变量阻塞
  4. 设置 _counter = 0

2.调用unpark

  1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
  2. 唤醒 _cond 条件变量中的 Thread_0
  3. Thread_0 恢复运行
  4. 设置 _counter 为 0


先调用unpark再调用park的过程

  1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
  2. 当前线程调用 Unsafe.park() 方法
  3. 检查 _counter ,本情况为 1,这时线程无需阻塞,继续运行
  4. 设置 _counter 为 0

4.8 ✨✨重新理解线程状态转换

  1. NEW -- > RUNNABLE

    1. 当调用t.start()方法时,由NEW --> RUNNABLE
  2. ✨✨RUNNABLE <--> WAITING

    1. 线程用synchronized(obj)获取了对象锁后
      1. 调用obj.wait()方法时,t 线程从RUNNABLE --> WAITING
      2. 调用obj.notify()obj.notifyAll()t.interrupt()
        1. 竞争锁成功,t 线程从WAITING --> RUNNABLE
        2. 竞争锁失败,t 线程从WAITING --> BLOCKED
  3. RUNNABLE <--> WAITING

    1. 当前线程调用 LockSupport.park() 方法会让当前线程从 RUNNABLE --> WAITING
    2. 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,会让目标线程从 WAITING --> RUNNABLE
  4. RUNNABLE <--> WAITING

    1. 当前线程调用 t.join() 方法时,当前线程从 RUNNABLE --> WAITING
      • 注意是当前线程在t 线程对象的监视器上等待
    2. t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 WAITING --> RUNNABLE
  5. RUNNABLE <--> TIMED_WAITING

    t 线程用 synchronized(obj) 获取了对象锁后

    1. 调用 obj.wait(long n) 方法时,t 线程从 RUNNABLE --> TIMED_WAITING
    2. t 线程等待时间超过了 n 毫秒,或调用 obj.notify()obj.notifyAll() , t.interrupt() 时
      1. 竞争锁成功,t 线程从 TIMED_WAITING --> RUNNABLE
      2. 竞争锁失败,t 线程从 TIMED_WAITING --> BLOCKED
  6. RUNNABLE <--> TIMED_WAITING

    1. 当前线程调用 t.join(long n) 方法时,当前线程从 RUNNABLE --> TIMED_WAITING
      • 注意是当前线程在t 线程对象的监视器上等待
    2. 当前线程等待时间超过了 n 毫秒,或t 线程运行结束,或调用了当前线程的 interrupt() 时,当前线程从 TIMED_WAITING --> RUNNABLE
  7. RUNNABLE <--> TIMED_WAITING

    1. 当前线程调用 Thread.sleep(long n) ,当前线程从 RUNNABLE --> TIMED_WAITING

    2. 当前线程等待时间超过了 n 毫秒或调用了线程 的 interrupt() ,当前线程从 TIMED_WAITING --> RUNNABLE

  8. RUNNABLE <--> TIMED_WAITING

    1. 当前线程调用 LockSupport.parkNanos(long nanos)LockSupport.parkUntil(long millis) 时,当前线 程从 RUNNABLE --> TIMED_WAITING
    2. 调用 LockSupport.unpark(目标线程) 或调用了线程 的 interrupt() ,或是等待超时,会让目标线程从 TIMED_WAITING--> RUNNABLE
  9. RUNNABLE <--> BLOCKED

  • t 线程synchronized(obj) 获取了对象锁时如果竞争失败,从 RUNNABLE --> BLOCKED
  • 持 obj 锁线程的同步代码块执行完毕,会唤醒该对象上所有 BLOCKED 的线程重新竞争,如果其中 t 线程竞争 成功,从 BLOCKED --> RUNNABLE ,其它失败的线程仍然 BLOCKED
  1. RUNNABLE <--> TERMINATED
  • 当前线程所有代码运行完毕,进入 TERMINATED

4.9 多把锁(细粒度锁)

多把不相干的锁

一间大屋子有两个功能:睡觉、学习,互不相干。

现在小南要学习,小女要睡觉,但如果只用一间屋子(一个对象锁)的话,那么并发度很低

解决方法是准备多个房间(多个对象锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class BigRoom {
private final Object studyRoom = new Object();
private final Object bedRoom = new Object();
public void sleep() {
synchronized (bedRoom) {
log.debug("sleeping 2 小时");
Sleeper.sleep(2);
}
}
public void study() {
synchronized (studyRoom) {
log.debug("study 1 小时");
Sleeper.sleep(1);
}
}
}

将锁的粒度细分

  • 好处,是可以增强并发度
  • 坏处,如果一个线程需要同时获得多把锁,就容易发生死锁
  • 前提:两把锁锁住的两段代码互不相关

4.10 活跃性

活跃性相关的一系列问题都可以用ReentrantLock进行解决。

4.10.1 死锁

有这样的情况:一个线程需要同时获取多把锁,这时就容易发生死锁t1 线程获得A对象锁,接下来想获取B对象的锁;t2 线程获得B对象锁,接下来想获取A对象的锁例。Test28.java

4.10.2 检测死锁jconsole

检测死锁可以使用 jconsole工具;或者使用 jps 定位进程 id,再用 jstack 定位死锁:Test28.java

下面使用jstack工具进行演示

  • Found one Java-level deadlock:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
D:\我的项目\JavaLearing\java并发编程\jdk8>jps
1156 RemoteMavenServer36
20452 Test25
9156 Launcher
23544 Jps
23848
22748 Test28

D:\我的项目\JavaLearing\java并发编程\jdk8>jstack 22748
2020-07-12 18:54:44
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.211-b12 mixed mode):

"DestroyJavaVM" #14 prio=5 os_prio=0 tid=0x0000000002a03800 nid=0x5944 waiting on condition [0x0000000000000000]
java.lang.Thread.State: RUNNABLE

//................省略了大部分内容.............//
Found one Java-level deadlock:
=============================
"线程二":
waiting to lock monitor 0x0000000002afc0e8 (object 0x00000000db9f76d0, a java.lang.Object),
which is held by "线程1"
"线程1":
waiting to lock monitor 0x0000000002afe1e8 (object 0x00000000db9f76e0, a java.lang.Object),
which is held by "线程二"

Java stack information for the threads listed above:
===================================================
"线程二":
at com.concurrent.test.Test28.lambda$main$1(Test28.java:39)
- waiting to lock <0x00000000db9f76d0> (a java.lang.Object)
- locked <0x00000000db9f76e0> (a java.lang.Object)
at com.concurrent.test.Test28$$Lambda$2/326549596.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
"线程1":
at com.concurrent.test.Test28.lambda$main$0(Test28.java:23)
- waiting to lock <0x00000000db9f76e0> (a java.lang.Object)
- locked <0x00000000db9f76d0> (a java.lang.Object)
at com.concurrent.test.Test28$$Lambda$1/1343441044.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)


4.10.3 哲学家就餐问题

有五位哲学家,围坐在圆桌旁。

他们只做两件事,思考和吃饭,思考一会吃口饭,吃完饭后接着思考。

吃饭时要用两根筷子吃,桌上共有 5 根筷子,每位哲学家左右手边各有一根筷子。

如果筷子被身边的人拿着,自己就得等待 Test29.java

  • 当每个哲学家即线程持有一根筷子时,他们都在等待另一个线程释放锁,因此造成了死锁。这种线程没有按预期结束,执行不下去的情况,归类为【活跃性】问题,除了死锁以外,还有活锁和饥饿者两种情况

4.10.4 活锁

活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束,例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class TestLiveLock {
static volatile int count = 10;
static final Object lock = new Object();
public static void main(String[] args) {
new Thread(() -> {
// 期望减到 0 退出循环
while (count > 0) {
sleep(0.2);
count--;
log.debug("count: {}", count);
}
}, "t1").start();
new Thread(() -> {
// 期望超过 20 退出循环
while (count < 20) {
sleep(0.2);
count++;
log.debug("count: {}", count);
}
}, "t2").start();
}
}

解决方式:

  • 错开线程的运行时间,使得一方不能改变另一方的结束条件。
  • 将睡眠时间调整为随机数。

4.10.5 饥饿

很多教程中把饥饿定义为,一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束,饥饿的情况不易演示,讲读写锁时会涉及饥饿问题下面我讲一下一个线程饥饿的例子,先来看看使用顺序加锁的方式解决之前的死锁问题,就是两个线程对两个不同的对象加锁的时候都使用相同的顺序进行加锁。 但是会产生饥饿问题

顺序加锁的解决方案

4.11 ReentrantLock

相对于 synchronized 它具备如下特点

  1. 可中断
  2. 可以设置超时时间
  3. 可以设置为公平锁
  4. 支持多个条件变量,即对与不满足条件的线程可以放到不同的集合中等待

与 synchronized 一样,都支持可重入

基本语法

1
2
3
4
5
6
7
8
9
// 获取锁
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}

4.11.1 可重入

可重入是指同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁 如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
method1();
}
public static void method1() {
lock.lock();
try {
log.debug("execute method1");
method2();
} finally {
lock.unlock();
}
}
public static void method2() {
lock.lock();
try {
log.debug("execute method2");
method3();
} finally {
lock.unlock();
}
}
public static void method3() {
lock.lock();
try {
log.debug("execute method3");
} finally {
lock.unlock();
}
}

输出

1
2
3
17:59:11.862 [main] c.TestReentrant - execute method1 
17:59:11.865 [main] c.TestReentrant - execute method2
17:59:11.865 [main] c.TestReentrant - execute method3

4.11.2 可打断

可打断指的是处于阻塞状态等待锁的线程可以被打断等待。注意lock.lockInterruptibly()lock.trylock()方法是可打断的,lock.lock()不是。可打断的意义在于避免得不到锁的线程无限制地等待下去,防止死锁的一种方式。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("等锁的过程中被打断");
return;
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(1);
t1.interrupt();
log.debug("执行打断");
} finally {
lock.unlock();
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
18:02:40.520 [main] c.TestInterrupt - 获得了锁
18:02:40.524 [t1] c.TestInterrupt - 启动...
18:02:41.530 [main] c.TestInterrupt - 执行打断
java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchr
onizer.java:898)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchron
izer.java:1222)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at cn.itcast.n4.reentrant.TestInterrupt.lambda$main$0(TestInterrupt.java:17)
at java.lang.Thread.run(Thread.java:748)
18:02:41.532 [t1] c.TestInterrupt - 等锁的过程中被打断

注意如果是不可中断模式,那么即使使用了 interrupt 也不会让等待中断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
lock.lock();
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(1);
t1.interrupt();
log.debug("执行打断");
sleep(1);
} finally {
log.debug("释放了锁");
lock.unlock();
}

输出

1
2
3
4
5
18:06:56.261 [main] c.TestInterrupt - 获得了锁
18:06:56.265 [t1] c.TestInterrupt - 启动...
18:06:57.266 [main] c.TestInterrupt - 执行打断 // 这时 t1 并没有被真正打断, 而是仍继续等待锁
18:06:58.267 [main] c.TestInterrupt - 释放了锁
18:06:58.267 [t1] c.TestInterrupt - 获得了锁

4.11.3 锁超时

可打断指的是处于阻塞状态等待锁的线程可以被打断等待。注意lock.lockInterruptibly()lock.trylock()方法是可打断的,lock.lock()不是。可打断的意义在于避免得不到锁的线程无限制地等待下去,防止死锁的一种方式。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
log.debug("等锁的过程中被打断");
return;
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(1);
t1.interrupt();
log.debug("执行打断");
} finally {
lock.unlock();
}

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
18:02:40.520 [main] c.TestInterrupt - 获得了锁
18:02:40.524 [t1] c.TestInterrupt - 启动...
18:02:41.530 [main] c.TestInterrupt - 执行打断
java.lang.InterruptedException
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchr
onizer.java:898)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchron
izer.java:1222)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at cn.itcast.n4.reentrant.TestInterrupt.lambda$main$0(TestInterrupt.java:17)
at java.lang.Thread.run(Thread.java:748)
18:02:41.532 [t1] c.TestInterrupt - 等锁的过程中被打断

注意如果是不可中断模式,那么即使使用了 interrupt 也不会让等待中断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("启动...");
lock.lock();
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("获得了锁");
t1.start();
try {
sleep(1);
t1.interrupt();
log.debug("执行打断");
sleep(1);
} finally {
log.debug("释放了锁");
lock.unlock();
}

输出

1
2
3
4
5
18:06:56.261 [main] c.TestInterrupt - 获得了锁
18:06:56.265 [t1] c.TestInterrupt - 启动...
18:06:57.266 [main] c.TestInterrupt - 执行打断 // 这时 t1 并没有被真正打断, 而是仍继续等待锁
18:06:58.267 [main] c.TestInterrupt - 释放了锁
18:06:58.267 [t1] c.TestInterrupt - 获得了锁

4.11.4 公平锁

synchronized锁中,在entrylist等待的锁在竞争时不是按照先到先得来获取锁的,所以说synchronized锁时不公平的;ReentranLock锁默认是不公平的,但是可以通过设置实现公平锁。本意是为了解决之前提到的饥饿问题,但是公平锁一般没有必要,会降低并发度,使用trylock也可以实现。

ReentrantLock 默认是不公平的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
ReentrantLock lock = new ReentrantLock(false);
lock.lock();
for (int i = 0; i < 500; i++) {
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " running...");
} finally {
lock.unlock();
}
}, "t" + i).start();
}
// 1s 之后去争抢锁
Thread.sleep(1000);
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + " start...");
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " running...");
} finally {
lock.unlock();
}
}, "强行插入").start();
lock.unlock();

强行插入,有机会在中间输出

注意:该实验不一定总能复现

1
2
3
4
5
6
7
8
9
10
11
12
t39 running... 
t40 running...
t41 running...
t42 running...
t43 running...
强行插入 start...
强行插入 running...
t44 running...
t45 running...
t46 running...
t47 running...
t49 running...

改为公平锁后

1
ReentrantLock lock = new ReentrantLock(true);

强行插入,总是在最后输出

1
2
3
4
5
6
7
8
9
10
t465 running... 
t464 running...
t477 running...
t442 running...
t468 running...
t493 running...
t482 running...
t485 running...
t481 running...
强行插入 running...

公平锁一般没有必要,会降低并发度,后面分析原理时会讲解

4.11.5 条件变量

synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入 waitSet 等待

ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比

  • synchronized 是那些不满足条件的线程都在一间休息室等消息
  • 而 ReentrantLock 支持多间休息室,有专门等烟的休息室、专门等早餐的休息室、唤醒时也是按休息室来唤 醒

使用要点(使用流程)

  • await 前需要获得锁
  • await 执行后,会释放锁,进入 conditionObject 等待
  • await 的线程被唤醒(或打断、或超时)取重新竞争 lock 锁
  • 竞争 lock 锁成功后,从 await 后继续执行

例子 Condition

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
static ReentrantLock lock = new ReentrantLock();
static Condition waitCigaretteQueue = lock.newCondition();
static Condition waitbreakfastQueue = lock.newCondition();
static volatile boolean hasCigrette = false;
static volatile boolean hasBreakfast = false;
public static void main(String[] args) {
new Thread(() -> {
try {
lock.lock();
while (!hasCigrette) {
try {
waitCigaretteQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的烟");
} finally {
lock.unlock();
}
}).start();
new Thread(() -> {
try {
lock.lock();
while (!hasBreakfast) {
try {
waitbreakfastQueue.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("等到了它的早餐");
} finally {
lock.unlock();
}
}).start();
sleep(1);
sendBreakfast();
sleep(1);
sendCigarette();
}
private static void sendCigarette() {
lock.lock();
try {
log.debug("送烟来了");
hasCigrette = true;
waitCigaretteQueue.signal();
} finally {
lock.unlock();
}
}
private static void sendBreakfast() {
lock.lock();
try {
log.debug("送早餐来了");
hasBreakfast = true;
waitbreakfastQueue.signal();
} finally {
lock.unlock();
}
}

4.12 同步模式之顺序控制

  1. 固定运行顺序,比如,必须先 2 后 1 打印
  • wait notify 版 Test35.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 用来同步的对象
static Object obj = new Object();
// t2 运行标记, 代表 t2 是否执行过
static boolean t2runed = false;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (obj) {
// 如果 t2 没有执行过
while (!t2runed) {
try {
// t1 先等一会
obj.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
System.out.println(1);
});
Thread t2 = new Thread(() -> {
System.out.println(2);
synchronized (obj) {
// 修改运行标记
t2runed = true;
// 通知 obj 上等待的线程(可能有多个,因此需要用 notifyAll)
obj.notifyAll();
}
});
t1.start();
t2.start();
}
  1. Park Unpark 版 Test36.java

可以看到,实现上很麻烦:

  • 首先,需要保证先 wait 再 notify,否则 wait 线程永远得不到唤醒。因此使用了『运行标记』来判断该不该 wait
  • 第二,如果有些干扰线程错误地 notify 了 wait 线程,条件不满足时还要重新等待,使用了 while 循环来解决 此问题
  • 最后,唤醒对象上的 wait 线程需要使用 notifyAll,因为『同步对象』上的等待线程可能不止一个

可以使用 LockSupport 类的 park 和 unpark 来简化上面的题目:

1
2
3
4
5
6
7
8
9
10
11
12
13
Thread t1 = new Thread(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) { }
// 当没有『许可』时,当前线程暂停运行;有『许可』时,用掉这个『许可』,当前线程恢复运行
LockSupport.park();
System.out.println("1");
});
Thread t2 = new Thread(() -> {
System.out.println("2");
// 给线程 t1 发放『许可』(多次连续调用 unpark 只会发放一个『许可』)
LockSupport.unpark(t1);
});
t1.start();
t2.start();

4.13 同步模式之交替输出

交替输出,线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现

1、wait notify 版 Test37.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class SyncWaitNotify {
private int flag;
private int loopNumber;
public SyncWaitNotify(int flag, int loopNumber) {
this.flag = flag;
this.loopNumber = loopNumber;
}
public void print(int waitFlag, int nextFlag, String str) {
for (int i = 0; i < loopNumber; i++) {
synchronized (this) {
while (this.flag != waitFlag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.print(str);
flag = nextFlag;
this.notifyAll();
}
}
}
}

1
2
3
4
5
6
7
8
9
10
SyncWaitNotify syncWaitNotify = new SyncWaitNotify(1, 5);
new Thread(() -> {
syncWaitNotify.print(1, 2, "a");
}).start();
new Thread(() -> {
syncWaitNotify.print(2, 3, "b");
}).start();
new Thread(() -> {
syncWaitNotify.print(3, 1, "c");
}).start();

2、Lock 条件变量版 Test38.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class AwaitSignal extends ReentrantLock {
public void start(Condition first) {
this.lock();
try {
log.debug("start");
first.signal();
} finally {
this.unlock();
}
}
public void print(String str, Condition current, Condition next) {
for (int i = 0; i < loopNumber; i++) {
this.lock();
try {
current.await();
log.debug(str);
next.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
this.unlock();
}
}
}
// 循环次数
private int loopNumber;
public AwaitSignal(int loopNumber) {
this.loopNumber = loopNumber;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
AwaitSignal as = new AwaitSignal(5);
Condition aWaitSet = as.newCondition();
Condition bWaitSet = as.newCondition();
Condition cWaitSet = as.newCondition();
new Thread(() -> {
as.print("a", aWaitSet, bWaitSet);
}).start();
new Thread(() -> {
as.print("b", bWaitSet, cWaitSet);
}).start();
new Thread(() -> {
as.print("c", cWaitSet, aWaitSet);
}).start();
as.start(aWaitSet);

注意

该实现没有考虑 a,b,c 线程都就绪再开始

3、Park Unpark 版 Test39.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class SyncPark {
private int loopNumber;
private Thread[] threads;
public SyncPark(int loopNumber) {
this.loopNumber = loopNumber;
}
public void setThreads(Thread... threads) {
this.threads = threads;
}
public void print(String str) {
for (int i = 0; i < loopNumber; i++) {
LockSupport.park();
System.out.print(str);
LockSupport.unpark(nextThread());
}
}
private Thread nextThread() {
Thread current = Thread.currentThread();
int index = 0;
for (int i = 0; i < threads.length; i++) {
if(threads[i] == current) {
index = i;
break;
}
}
if(index < threads.length - 1) {
return threads[index+1];
} else {
return threads[0];
}
}
public void start() {
for (Thread thread : threads) {
thread.start();
}
LockSupport.unpark(threads[0]);
}
}
SyncPark syncPark = new SyncPark(5);
Thread t1 = new Thread(() -> {
syncPark.print("a");
});
Thread t2 = new Thread(() -> {
syncPark.print("b");
});
Thread t3 = new Thread(() -> {
syncPark.print("c\n");
});
syncPark.setThreads(t1, t2, t3);
syncPark.start();

本章小结

本章我们需要重点掌握的是

  • 分析多线程访问共享资源时,哪些代码片段属于临界区
  • 使用 synchronized 互斥解决临界区的线程安全问题
    • 掌握 synchronized 锁对象语法
    • 掌握 synchronzied 加载成员方法和静态方法语法
    • 掌握 wait/notify 同步方法
  • 使用 lock 互斥解决临界区的线程安全问题
    • 掌握 lock 的使用细节:可打断、锁超时、公平锁、条件变量
  • 学会分析变量的线程安全性、掌握常见线程安全类的使用
    • 线程安全类的方法是原子性的,但方法之间的组合要具体分析。
  • 了解线程活跃性问题:死锁、活锁、饥饿。
    • 解决死锁、饥饿的方式:ReentranLock
  • 应用方面
    • 互斥:使用 synchronized 或 Lock 达到共享资源互斥效果
    • 同步:使用 wait/notify 或 Lock 的条件变量来达到线程间通信效果
  • 原理方面
    • monitor、synchronized 、wait/notify 原理
    • synchronized 进阶原理
    • park & unpark 原理
  • 模式方面
    • 同步模式之保护性暂停
    • 异步模式之生产者消费者
    • 同步模式之顺序控制