0%

Java多线程

1. 初识多线程

1.2. Java 中创建线程的几种方式

完整代码

  1. Thread
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class MythreadT extends Thread {
private int ticket = 5;

public MythreadT() {
//设置线程名称
super("[MythreadT]");
}

@Override
public void run() {
String name = Thread.currentThread().getName();
System.out.println(name + " start");
while(true){
System.out.println(name + " ticket = " + ticket--);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(ticket < 0){
break;
}
}
System.out.println(name + " end");
}
public static void main(String[] args) {
MythreadT m = new MythreadT();
m.start();
}
}
  1. Runnable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class MythreadR implements Runnable {
private int ticket = 5;

@Override
public void run() {
String name = Thread.currentThread().getName();
System.out.println(name + " start");
while(true){
System.out.println(name + " ticket = " + ticket--);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(ticket < 0){
break;
}
}
System.out.println(name + " end");
}

public static void main(String[] args) {
MythreadR m = new MythreadR();
Thread thread = new Thread(m, "[MythreadR]");
thread.start();
}

}
  1. Callable
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class MythreadC implements Callable<Integer> {
private int ticket = 5;

@Override
public Integer call() throws Exception {
String name = Thread.currentThread().getName();
System.out.println(name + " start");
while(true){
System.out.println(name + " ticket = " + ticket--);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(ticket < 0){
break;
}
}
System.out.println(name + " end");

return ticket;
}

public static void main(String[] args) {
MythreadC m = new MythreadC();
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<Integer> future = executorService.submit(m);
executorService.shutdown();
}
}
  1. 线程池

    完整代码

    1. CachedThreadPool
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public class CacheTest {

    public static void main(String[] args) {
    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    for (int i=0; i < 10; i++) {
    final int ii = i;
    cachedThreadPool.execute(()->System.out.println("线程名称"+Thread.currentThread().getName() + "执行 " + ii));
    }
    }
    }
    1. FixedThreadPool
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class FixedTest {

    /**
    * 这个测试是为了测试,使用 fixedThreadPool 会导致内存溢出,使用下面的命令可以设置jvm虚拟机的参数
    * java -Xmx8m -Xms8m multithread.threadpool.FixedTest
    *
    * 导致内存溢出的原因是fixedThreadPool,是使用LinkedBlockingQueue实现的,会不断的添加线程
    * public static ExecutorService newFixedThreadPool(int nThreads) {
    * return new ThreadPoolExecutor(nThreads, nThreads,
    * 0L, TimeUnit.MILLISECONDS,
    * new LinkedBlockingQueue<Runnable>());
    * }
    * @param args
    */
    public static void main(String[] args) {
    ExecutorService e = Executors.newFixedThreadPool(5);
    for (int i=0; i<Integer.MAX_VALUE; i++) {
    final int ii = i;
    e.execute(()->System.out.println(Thread.currentThread().getName() + " execute " + ii));
    }
    }
    }

2. sleep、yield、join

2.1. sleep

该线程睡眠一段时间,将CPU让给其他线程

2.2. yield

线程调用yield,放弃当前的CPU使用权,进入等待队列,重新等待调度

2.3. join

等待另一个线程结束

3. 线程的状态

Java中的状态有6个:

  • NEW
    A thread that has not yet started is in this state.
  • RUNNABLE
    A thread executing in the Java virtual machine is in this state.
  • BLOCKED
    A thread that is blocked waiting for a monitor lock is in this state.
  • WAITING
    A thread that is waiting indefinitely for another thread to perform a particular action is in this state.
  • TIMED_WAITING
    A thread that is waiting for another thread to perform an action for up to a specified waiting time is in this state.
  • TERMINATED
    A thread that has exited is in this state.

Java没有将就绪态和运行态进行区分,统一称作RUNNABLE状态

3.1. 线程状态的查看

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class ThreadStatus implements Runnable {
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
System.out.println("1." + Thread.currentThread().getName() + " status is " + Thread.currentThread().getState());
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("2." + Thread.currentThread().getName() + " status is " + Thread.currentThread().getState());
System.out.println(Thread.currentThread().getName() + " number = " + i);
}
}

public void printStatus(Thread t) {
System.out.println(t.getName() + " status is " + t.getState());
}
public static void main(String[] args) {
ThreadStatus threadStatus = new ThreadStatus();
Thread t = new Thread(threadStatus);
threadStatus.printStatus(t);
t.start();
threadStatus.printStatus(t);
try {
for (int i = 0; i < 10; i++) {
threadStatus.printStatus(t);
//Thread.sleep(80);
}
t.join();
threadStatus.printStatus(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Thread-0 status is NEW
Thread-0 status is RUNNABLE
Thread-0 status is RUNNABLE
Thread-0 status is RUNNABLE
1.Thread-0 status is RUNNABLE
Thread-0 status is BLOCKED
Thread-0 status is TIMED_WAITING
Thread-0 status is TIMED_WAITING
Thread-0 status is TIMED_WAITING
Thread-0 status is TIMED_WAITING
Thread-0 status is TIMED_WAITING
Thread-0 status is TIMED_WAITING
Thread-0 status is TIMED_WAITING
2.Thread-0 status is RUNNABLE
Thread-0 number = 0
.
.
.
1.Thread-0 status is RUNNABLE
2.Thread-0 status is RUNNABLE
Thread-0 number = 9
Thread-0 status is TERMINATED

Process finished with exit code 0

从上面的结果可以看出,调用Thread t = new Thread(threadStatus);时,线程的状态是NEW,调用start()方法后,线程状态变成RUNNABLE状态,调用sleep()进入TIMED_WAITING状态,线程执行结束后变成TERMINATED状态。

3.2. 一个问题

执行结果的第6行的状态是BLOCKED,但是这个程序中并没有使用锁,这里的BLOCKED状态不知道从何处来的

4. synchronized锁

访问临界资源的时候需要上锁,保证同一时间只能有一个线程访问临界资源。

  1. synchronized的底层实现

JDK早期版本 重量级 – 向OS申请锁

后来的改进:

锁升级的概念: 我就是厕所所长

sync (Object)

markword 记录这个线程ID(偏向锁)

如果线程争用: 升级为 自旋锁

10次以后,

升级为重量级锁 - os

执行时间短(加锁代码),线程数少,用自旋

执行时间长,线程数多,用系统锁

4.1. synchronized

完整代码

synchronized对某个对象加锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class SynchronizedTest {
private int count = 10;
private Object o = new Object();

public synchronized void m() { //等同于在方法的代码执行时要synchronized(this)
count--;
System.out.println(Thread.currentThread().getName() + "count = " + count);
}
/*
public void m() {
synchronized (this) { //任何线程要执行下面的代码时,必须先拿到this的锁
count--;
System.out.println(Thread.currentThread().getName() + "count = " + count);
}
} */

public void m1() {
synchronized (o) { //任何线程要执行下面的代码时,必须先拿到o的锁
count--;
System.out.println(Thread.currentThread().getName() + "count = " + count);
}
}

public static void main(String[] args) {
SynchronizedTest s = new SynchronizedTest();
Runnable a = new Runnable() {
@Override
public void run() {
s.run();
}
};
Thread t = new Thread(a);
Thread t1 = new Thread(a);
t.start();
t1.start();
}
}

m()函数和下面注释中的写法是等价的,m1()是synchronized锁的另一种用法,对某一个对象加锁。

4.2. 同步和非同步方法是否可以同时调用

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package multithread;

public class SynchronizedTest {
public synchronized void m2() {
System.out.println(Thread.currentThread().getName() + " m2 start ... ");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " m2 end ");
}

//非同步方法
public void m3() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " m3 ");
}

public static void main(String[] args) {
SynchronizedTest s = new SynchronizedTest();

//测试同步非同步方法是否可以同时调用
Runnable b = new Runnable() {
@Override
public void run() {
s.m2();
}
};
Runnable b2 = new Runnable() {
@Override
public void run() {
s.m2();
}
};

new Thread(b).start();
new Thread(b2).start();
}
}

执行结果

1
2
3
4
Thread-2 m2 start ... 
Thread-2 m2 end
Thread-3 m2 start ...
Thread-3 m2 end

修改main函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) {
SynchronizedTest s = new SynchronizedTest();

//测试同步非同步方法是否可以同时调用
Runnable b = new Runnable() {
@Override
public void run() {
s.m2();
}
};
Runnable b2 = new Runnable() {
@Override
public void run() {
s.m3();
}
};

new Thread(b).start();
new Thread(b2).start();
}

执行结果

1
2
3
Thread-2 m2 start ... 
Thread-3 m3
Thread-2 m2 end

证明:同步和非同步方法是可以同时调用的

4.3. 同步方法调用另一个同步方法可以吗?(重入性)

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package multithread;

import java.util.concurrent.Callable;

public class ReEnterTest {
public synchronized void m1() {
System.out.println(Thread.currentThread().getName() + " m1 start ");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
m2();
System.out.println(Thread.currentThread().getName() + " m1 end");
}

public synchronized void m2() {
System.out.println(Thread.currentThread().getName() + " m2 start");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " m2 end");
}

public static void main(String[] args) {
ReEnterTest r = new ReEnterTest();
Runnable c = new Runnable() {
@Override
public void run() {
r.m1();
}
};

new Thread(c).start();
new Thread(c).start();
}
}

执行结果:

1
2
3
4
5
6
7
8
Thread-0 m1 start 
Thread-0 m2 start
Thread-0 m2 end
Thread-0 m1 end
Thread-1 m1 start
Thread-1 m2 start
Thread-1 m2 end
Thread-1 m1 end
  1. 什么是可重入?

一个线程已经拥有某个对象的锁,再次申请时仍然会得到该对象的锁,叫做可重入。

  1. 为什么需要是可重入的?

因为如果不可重入,会发生死锁。

4.4. 出现异常,默认锁会释放

程序在执行过程中,如果出现异常,默认情况下锁会被释放,所以在并发处理过程中,有异常要多加小心,不然可能会发生不一致的情况。

比如,在一个web app处理过程中,多个servlet线程共同访问同一个资源,这时如果异常处理不合适,在第一个线程中抛出异常,其他线程会进入同步代码区,有可能会访问到异常时产生的数据,因此要非常小心的处理同步业务逻辑中的异常

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package multithread;

public class ErrorTest {
int count = 0;
public synchronized void m() {
System.out.println(Thread.currentThread().getName() + " start ");
while (true) {
count ++;
System.out.println(Thread.currentThread().getName() + " count = " + count);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

if (count == 5) {
int i = 1/0;
System.out.println(i);
}
}
}

public static void main(String[] args) {
ErrorTest e = new ErrorTest();
Runnable r = new Runnable() {
@Override
public void run() {
e.m();
}
};

new Thread(r).start();
new Thread(r).start();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Thread-0 start 
Thread-0 count = 1
Thread-0 count = 2
Thread-0 count = 3
Thread-0 count = 4
Thread-0 count = 5
Thread-1 start
Exception in thread "Thread-0" Thread-1 count = 6
java.lang.ArithmeticException: / by zero
at multithread.ErrorTest.m(ErrorTest.java:17)
at multithread.ErrorTest$1.run(ErrorTest.java:28)
at java.lang.Thread.run(Thread.java:748)
Thread-1 count = 7
Thread-1 count = 8
Thread-1 count = 9
Thread-1 count = 10
Thread-1 count = 11
Thread-1 count = 12

如果不产生异常thread-1是没有机会执行的,产生异常后,thread-1就得到了执行。

4.5. 优化

锁的细化,锁住更小的代码块

有时可能也需要锁的粗化,如果需要加的锁特别多,那么也可以在较大的代码块上直接加锁

4.6. 对象做锁时,禁止改变对象

锁定某个对象o,如果o的属性发生改变,不影响锁的使用;但是如果o变成另外一个对象,则锁定的对象发生改变,应该避免将锁定对象的引用变成另外的对象

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package multithread;

public class ObjectNoChange {
/*final*/ Object o = new Object();

void m() {
synchronized (o) {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
}
}

public static void main(String[] args) throws InterruptedException {
ObjectNoChange o = new ObjectNoChange();

new Thread(o::m).start();

Thread.sleep(3000);

Thread t2 = new Thread(o::m, "t2");
o.o = new Object();
t2.start();
}
}

执行结果

1
2
3
4
5
6
7
8
9
Thread-0
Thread-0
Thread-0
Thread-0
t2
Thread-0
t2
Thread-0
t2

为了防止这种情况,应该为加锁对象加上final关键字

5. volatile

5.1. volatile作用

  1. 保证线程可见性
    1. MESI
    2. 缓存一致性协议
  2. 禁止指令重排序
    1. DCL单例
    2. Double Check Lock
    3. Mgr06.java

5.2. 测试volatile的作用

完整代码

  1. 不使用volatile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class VolatileTest {
boolean running = true;

public void m() {
System.out.println("m start");
while (running) {

}
System.out.println("m end");
}

public static void main(String[] args) {
VolatileTest v = new VolatileTest();
Runnable r = new Runnable() {
@Override
public void run() {
v.m();
}
};

new Thread(r).start();

try {
Thread.sleep(1000);
v.running = false;
Thread.sleep(1000);
System.out.println(v.running);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

执行结果

1
2
3
4
m start
false

Process finished with exit code 130 (interrupted by signal 2: SIGINT)

不使用volatile,程序不能停止,主线程对于running的修改没有及时更新,子线程不能感知

  1. 为running添加volatile
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package multithread;

public class VolatileTest {
volatile boolean running = true;

public void m() {
System.out.println("m start");
while (running) {

}
System.out.println("m end");
}

public static void main(String[] args) {
VolatileTest v = new VolatileTest();
Runnable r = new Runnable() {
@Override
public void run() {
v.m();
}
};

new Thread(r).start();

try {
Thread.sleep(1000);
v.running = false;
Thread.sleep(1000);
System.out.println(v.running);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

执行结果

1
2
3
m start
m end
false

5.3. volatile的局限

完整代码

volatile并不能保证多个线程共同修改running变量所带来的不一致问题,也就是说volatile不能替代synchronized

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package multithread;

import java.util.ArrayList;
import java.util.List;

public class VolatileShort {
volatile int count = 0;
void m() {
for (int i =0; i<10000; i++) {
count ++;
}
}

public static void main(String[] args) {
VolatileShort v = new VolatileShort();
List<Thread> threads = new ArrayList<>();

for (int i =0; i<10; i++) {
threads.add(new Thread(v::m));
}

threads.forEach((o)->o.start());

threads.forEach((o)-> {
try {
o.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});

System.out.println(v.count);
}
}

执行结果

1
65584

6. CAS锁(无锁优化,自旋,乐观锁)

CAS(Compare And Swap)

  1. cas(V, Expected, NewValue)

    cas 操作时会将V和E进行比较,如果过程中E发生变化,那么就回滚操作,cas操作是CPU原语支持的,不能被打断

    • if V==E
    • V = New
    • otherwise try again or fail

CPU原语支持

  1. ABA问题

    例如,线程1期望对变量a进行+1操作,在这时线程1被挂起,线程2将变量a进行-1操作,后又进行了+1操作,这时线程1得到了调度,那么线程1是不能直到变量曾经发生过变化的。

    解决方法:

    ​ 加version,每一次操作都添加一个版本号

如果是基本类型,ABA的问题是不会有什么影响的;

引用类型:你和女友复合,而期间女友谈了一些恋爱。(虽然不恰当,但比较直观,意思是你大妈已经不是你大妈了)

6.1. 对比synchronized、Atomic、LongAdder的效率

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;

public class SyncAtomicLongadder {
static Long count1 = 0L;
static AtomicInteger count2 = new AtomicInteger();
static LongAdder count3 = new LongAdder();

public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[1000];
long start ,end = 0L;

// 使用synchronized
Object lock = new Object();
for (int i=0; i<threads.length; i++) {
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
for (int k=0; k<100000; k++)
synchronized (lock) {
count1++;
}
}
});
}

start = System.currentTimeMillis();
for (Thread t: threads
) {
t.start();
}
for (Thread t: threads
) {
t.join();
}
end = System.currentTimeMillis();
System.out.println("sync " + (end - start));

// Atomic
for (int i=0; i<threads.length; i++) {
threads[i] = new Thread(()->{
for (int k=0; k<100000; k++) {
count2.incrementAndGet();
}
});
}
start = System.currentTimeMillis();
for (Thread t: threads
) {
t.start();
}
for (Thread t: threads
) {
t.join();
}
end = System.currentTimeMillis();
System.out.println("atomic " + (end - start));


//LongAdder
for (int i=0; i<threads.length; i++) {
threads[i] = new Thread(()->{
for (int k=0; k<100000; k++) {
count3.increment();
}
});
}

start = System.currentTimeMillis();
for (Thread t: threads
) {
t.start();
}
for (Thread t: threads
) {
t.join();
}
end = System.currentTimeMillis();
System.out.println("LongAdder " + (end - start));
}
}

执行结果

1
2
3
4
5
sync 4438
atomic 2195
LongAdder 377

Process finished with exit code 0

7. Semaphore

wiki-信号量

信号量(英语:semaphore)又称为信号标,是一个同步对象,用于保持在0至指定最大值之间的一个计数值。当线程完成一次对该semaphore对象的等待(wait)时,该计数值减一;当线程完成一次对semaphore对象的释放(release)时,计数值加一。当计数值为0,则线程等待该semaphore对象不再能成功直至该semaphore对象变成signaled状态。semaphore对象的计数值大于0,为signaled状态;计数值等于0,为nonsignaled状态.

semaphore对象适用于控制一个仅支持有限个用户的共享资源,是一种不需要使用忙碌等待(busy waiting)的方法。

信号量的概念是由荷兰计算机科学家艾兹赫尔·戴克斯特拉(Edsger W. Dijkstra)发明的,广泛的应用于不同的操作系统中。在系统中,给予每一个进程一个信号量,代表每个进程目前的状态,未得到控制权的进程会在特定地方被强迫停下来,等待可以继续进行的信号到来。如果信号量是一个任意的整数,通常被称为计数信号量(Counting semaphore),或一般信号量(general semaphore);如果信号量只有二进制的0或1,称为二进制信号量(binary semaphore)。在linux系统中,二进制信号量(binary semaphore)又称互斥锁(Mutex)。

计数信号量具备两种操作动作,称为V(signal())与P(wait())(即部分参考书常称的“PV操作”)。V操作会增加信号标S的数值,P操作会减少它。

信号量通常被用来严格限制访问一个资源的线程的数量。

下面的程序模拟了一个停车场,共有3个停车位,车位满了之后,后面的车辆需要等待车位。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package multithread;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Semaphore;

public class SemaphoreTest {
private static final int Max_AVALIABLE = 3;
private final Semaphore semaphore = new Semaphore(Max_AVALIABLE);

private ArrayList<Map<Object, Integer>> objects = new ArrayList<>();
private boolean[] used = new boolean[Max_AVALIABLE];

// 停车
public int park(Object o) throws InterruptedException {
semaphore.acquire();
return park1(o);
}

private synchronized int park1(Object o) {
int ret = -1;

for (int i=0; i<Max_AVALIABLE; i++) {
if (!used[i]) {
used[i] = true;
Map<Object, Integer> parkpos= new HashMap<>();
parkpos.put(o, i);
objects.add(parkpos);
ret = i;
System.out.println(Thread.currentThread().getName() + o + " park in the " + ret);
break;
}
}

return ret;
}

// 离开
public synchronized int leave(Object o) {
int ret = leave1(o);
if (ret >= 0) {
semaphore.release();
}
return ret;
}
private synchronized int leave1(Object o) {
int ret = -1;

for (Map m: objects
) {
if (m.containsKey(o)) {
Object v = m.get(o);
if (v instanceof Integer) {
used[(Integer) v] = false;
ret = (Integer) v;
Map<Object, Integer> parkpos= new HashMap<>();
parkpos.put(o, ret);
objects.remove(parkpos);
System.out.println(Thread.currentThread().getName() + o + " leave the park " + ret);
break;
}
}
}

return ret;
}

public static void main(String[] args) {
SemaphoreTest s = new SemaphoreTest();
ArrayList<Car> cars = new ArrayList<>();

for (int i=0; i<10; i++) {
Car c = new Car(i, s);
cars.add(c);
}

for (Car car: cars
) {
new Thread(car).start();
}
}
}

class Car implements Runnable{
// 车牌号
private int carNumber;
private SemaphoreTest s;

public Car(int carNumber, SemaphoreTest s) {
this.carNumber = carNumber;
this.s = s;
}

public int getCarNumber() {
return carNumber;
}

@Override
public void run() {
try {
s.park(this);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}

s.leave(this);
}

@Override
public String toString() {
return " Car{" +
"carNumber=" + carNumber +
'}';
}
}

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Thread-0 Car{carNumber=0} park in the 0
Thread-2 Car{carNumber=2} park in the 1
Thread-1 Car{carNumber=1} park in the 2
Thread-0 Car{carNumber=0} leave the park 0
Thread-2 Car{carNumber=2} leave the park 1
Thread-4 Car{carNumber=4} park in the 0
Thread-1 Car{carNumber=1} leave the park 2
Thread-3 Car{carNumber=3} park in the 1
Thread-5 Car{carNumber=5} park in the 2
Thread-4 Car{carNumber=4} leave the park 0
Thread-6 Car{carNumber=6} park in the 0
Thread-3 Car{carNumber=3} leave the park 1
Thread-5 Car{carNumber=5} leave the park 2
Thread-7 Car{carNumber=7} park in the 1
Thread-8 Car{carNumber=8} park in the 2
Thread-6 Car{carNumber=6} leave the park 0
Thread-9 Car{carNumber=9} park in the 0
Thread-7 Car{carNumber=7} leave the park 1
Thread-8 Car{carNumber=8} leave the park 2
Thread-9 Car{carNumber=9} leave the park 0

Process finished with exit code 0

8. Lock

通常,锁提供对共享资源的独占访问:一次只能有一个线程可以获取该锁,并且对共享资源的所有访问都需要首先获取该锁。 但是,某些锁可能允许并发访问共享资源,例如ReadWriteLock的读取锁。这是synchronized办不到的。

java.util.concurrent.locks

Interface Lock

All Known Implementing Classes:

ReentrantLock, ReentrantReadWriteLock.ReadLock, ReentrantReadWriteLock.WriteLock

8.1. ReentrantLock

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package multithread;

import java.util.ArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockTest {
private Lock lock = new ReentrantLock();
private int count = 10;

public void sell() {
while(count > 0) {
if (lock.tryLock()) {
try {
count--;
System.out.println(Thread.currentThread().getName() + " count = " + count);
Thread.sleep(40);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}

} else {
System.out.println(Thread.currentThread().getName() + " failed to get lock ");
}
}
}

public static void main(String[] args) {
ArrayList<Thread> threads = new ArrayList<>();
ReentrantLockTest r = new ReentrantLockTest();

for (int i=0; i<3; i++) {
threads.add(new Thread(new Runnable() {
@Override
public void run() {
r.sell();
}
}));
}

for (Thread t: threads
) {
t.start();
}
}
}

8.2. ReadWriteLock

读-读能共存,读-写不能共存,写-写不能共存

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.Random;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class ReadWriteLockTest {
static Lock lock = new ReentrantLock();
private static int value;

static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();

public static void read(Lock lock) {
try {
lock.lock();
Thread.sleep(1000);
System.out.println("read over");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public static void write(Lock lock, int v) {
try {
lock.lock();
Thread.sleep(1000);
value = v;
System.out.println("write over");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
//Runnable readR = ()->read(lock);
Runnable readR = ()->read(readLock);

//Runnable writeR = ()->write(lock, new Random().nextInt());
Runnable writeR = ()->write(writeLock, new Random().nextInt());
for (int i=0; i<18; i++) new Thread(readR).start();
for (int i=0; i<2; i++) new Thread(writeR).start();
}
}

9. 线程栅栏

有时需要多个线程都完成某项任务后才能进行下一步工作,因此需要线程栅栏。

  1. CountDownLatch

    CountDownLatch

    这个类的作用是等待一些线程全部完成工作后再进行下一项工作

  2. CyclicBarrier

    CyclicBarrier

  3. Phaser

    Phaser

    功能上与CountDownLatch和CyclicBarrier类似,但支持更灵活的用法

10. ThreadLoacl

ThreadLocal 是某一个线程自身拥有的资源,其他线程不能访问。

每一个线程都拥有一个ThreadLocalMap的map,set()函数会先获取这个map,然后再向map中添加数据,map中的key是ThreadLocal的弱引用。

为什么Entry要使用弱引用?

若是强引用,即使tl=null,但key的引用仍然指向ThreadLocal对象,所以会有内存泄露,而使用弱引用则不会。

但是还有内存泄漏的风险,ThreadLocal被回收,key的值变为null,则导致value再也无法被访问,因此不再使用里面的值之后要使用remove函数删掉map里的值。

10.1. 源码实现

ThreadLocal.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
		/**
* The entries in this hash map extend WeakReference, using
* its main ref field as the key (which is always a
* ThreadLocal object). Note that null keys (i.e. entry.get()
* == null) mean that the key is no longer referenced, so the
* entry can be expunged from table. Such entries are referred to
* as "stale entries" in the code that follows.
*/
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
/**
* Get the map associated with a ThreadLocal. Overridden in
* InheritableThreadLocal.
*
* @param t the current thread
* @return the map
*/
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

/**
* Sets the current thread's copy of this thread-local variable
* to the specified value. Most subclasses will have no need to
* override this method, relying solely on the {@link #initialValue}
* method to set the values of thread-locals.
*
* @param value the value to be stored in the current thread's copy of
* this thread-local.
*/
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

Thread.java

1
ThreadLocal.ThreadLocalMap threadLocals = null;

Thread类拥有一个ThreadLocalMap类型的变量,每一个线程都有一个map

10.2. 代码测试

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package multithread;

public class ThreadLoaclTest {
public static void main(String[] args) {
ThreadLocal<Person> tl = new ThreadLocal<Person>();
new Thread(()-> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + tl.get());
}).start();

new Thread(()-> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
tl.set(new Person("zhangSan"));
}).start();

new Thread(()-> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
tl.set(new Person("liSi"));
System.out.println(Thread.currentThread().getName() + tl.get());
}).start();
}
}

class Person {
private String name;
Person(String name) {
this.name = name;
}

@Override
public String toString() {
return "Person{" +
"name='" + name + '\'' +
'}';
}
}

执行结果

1
2
3
4
Thread-0null
Thread-2Person{name='liSi'}

Process finished with exit code 0

11. 多线程经典问题

11.1. 生产者消费者问题

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package multithread;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;

public class ProducerConsumer1 {
public static void main(String[] args) {
WareHouse wareHouse = new WareHouse();

ArrayList<Thread> threads = new ArrayList<>();

for (int i=0; i<10; i++) {
threads.add(new Thread(new Producer(wareHouse)));
}

for (int i=0; i<2; i++) {
threads.add(new Thread(new Consumer(wareHouse)));
}

for (Thread t: threads
) {
t.start();
}

}
}

class WareHouse {
volatile List<Integer> wareHouse = Collections.synchronizedList(new ArrayList<>());
final int Max_length = 10;

synchronized void produce() {
while (wareHouse.size() == Max_length) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int a = new Random().nextInt(30);
wareHouse.add(a);
notifyAll();
System.out.println(Thread.currentThread().getName() + " produce " + a
+ " wareHouse size is " + wareHouse.size());
}

synchronized void consume() {
//仓库已空
while (wareHouse.size() == 0) {
try {
wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int a = wareHouse.remove(0);
notifyAll();
System.out.println(Thread.currentThread().getName() + " get " + a
+ " wareHouse size is " + wareHouse.size());
}

synchronized int getSize() {
return wareHouse.size();
}
}

class Producer implements Runnable {
private WareHouse wareHouse;

Producer(WareHouse wareHouse) {
this.wareHouse = wareHouse;
}

@Override
public void run() {
while (true) {
wareHouse.produce();
}
}
}

class Consumer implements Runnable {
private WareHouse wareHouse;
Consumer(WareHouse wareHouse) {
this.wareHouse = wareHouse;
}

@Override
public void run() {
while(true) {
wareHouse.consume();
}
}
}

执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
.
.
.
Thread-6 get 28 wareHouse size is 9
Thread-6 get 19 wareHouse size is 9
Thread-6 get 9 wareHouse size is 9
Thread-5 get 12 wareHouse size is 9
Thread-2 produce 0 wareHouse size is 9
Thread-5 get 3 wareHouse size is 8
Thread-4 produce 22 wareHouse size is 10
Thread-0 produce 2 wareHouse size is 10
Thread-6 get 3 wareHouse size is 8
Thread-6 get 28 wareHouse size is 9
Thread-6 get 28 wareHouse size is 9
.
.
.