1. 初识多线程 1.2. Java 中创建线程的几种方式
完整代码
Thread
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class MythreadT extends Thread { private int ticket = 5 ; public MythreadT () { super ("[MythreadT]" ); } @Override public void run () { String name = Thread.currentThread().getName(); System.out.println(name + " start" ); while (true ){ System.out.println(name + " ticket = " + ticket--); try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } if (ticket < 0 ){ break ; } } System.out.println(name + " end" ); } public static void main (String[] args) { MythreadT m = new MythreadT(); m.start(); } }
Runnable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class MythreadR implements Runnable { private int ticket = 5 ; @Override public void run () { String name = Thread.currentThread().getName(); System.out.println(name + " start" ); while (true ){ System.out.println(name + " ticket = " + ticket--); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } if (ticket < 0 ){ break ; } } System.out.println(name + " end" ); } public static void main (String[] args) { MythreadR m = new MythreadR(); Thread thread = new Thread(m, "[MythreadR]" ); thread.start(); } }
Callable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class MythreadC implements Callable <Integer > { private int ticket = 5 ; @Override public Integer call () throws Exception { String name = Thread.currentThread().getName(); System.out.println(name + " start" ); while (true ){ System.out.println(name + " ticket = " + ticket--); try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } if (ticket < 0 ){ break ; } } System.out.println(name + " end" ); return ticket; } public static void main (String[] args) { MythreadC m = new MythreadC(); ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<Integer> future = executorService.submit(m); executorService.shutdown(); } }
线程池
完整代码
CachedThreadPool
1 2 3 4 5 6 7 8 9 10 public class CacheTest { public static void main (String[] args) { ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for (int i=0 ; i < 10 ; i++) { final int ii = i; cachedThreadPool.execute(()->System.out.println("线程名称" +Thread.currentThread().getName() + "执行 " + ii)); } } }
FixedThreadPool
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class FixedTest { public static void main (String[] args) { ExecutorService e = Executors.newFixedThreadPool(5 ); for (int i=0 ; i<Integer.MAX_VALUE; i++) { final int ii = i; e.execute(()->System.out.println(Thread.currentThread().getName() + " execute " + ii)); } } }
2. sleep、yield、join 2.1. sleep 该线程睡眠一段时间,将CPU让给其他线程
2.2. yield 线程调用yield,放弃当前的CPU使用权,进入等待队列,重新等待调度
2.3. join 等待另一个线程结束
3. 线程的状态
Java中的状态有6个:
NEW
A thread that has not yet started is in this state.
RUNNABLE
A thread executing in the Java virtual machine is in this state.
BLOCKED
A thread that is blocked waiting for a monitor lock is in this state.
WAITING
A thread that is waiting indefinitely for another thread to perform a particular action is in this state.
TIMED_WAITING
A thread that is waiting for another thread to perform an action for up to a specified waiting time is in this state.
TERMINATED
A thread that has exited is in this state.
Java没有将就绪态和运行态进行区分,统一称作RUNNABLE状态
3.1. 线程状态的查看 完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class ThreadStatus implements Runnable { @Override public void run () { for (int i = 0 ; i < 10 ; i++) { try { System.out.println("1." + Thread.currentThread().getName() + " status is " + Thread.currentThread().getState()); Thread.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("2." + Thread.currentThread().getName() + " status is " + Thread.currentThread().getState()); System.out.println(Thread.currentThread().getName() + " number = " + i); } } public void printStatus (Thread t) { System.out.println(t.getName() + " status is " + t.getState()); } public static void main (String[] args) { ThreadStatus threadStatus = new ThreadStatus(); Thread t = new Thread(threadStatus); threadStatus.printStatus(t); t.start(); threadStatus.printStatus(t); try { for (int i = 0 ; i < 10 ; i++) { threadStatus.printStatus(t); } t.join(); threadStatus.printStatus(t); } catch (InterruptedException e) { e.printStackTrace(); } } }
执行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 Thread-0 status is NEW Thread-0 status is RUNNABLE Thread-0 status is RUNNABLE Thread-0 status is RUNNABLE 1.Thread-0 status is RUNNABLE Thread-0 status is BLOCKED Thread-0 status is TIMED_WAITING Thread-0 status is TIMED_WAITING Thread-0 status is TIMED_WAITING Thread-0 status is TIMED_WAITING Thread-0 status is TIMED_WAITING Thread-0 status is TIMED_WAITING Thread-0 status is TIMED_WAITING 2.Thread-0 status is RUNNABLE Thread-0 number = 0 . . . 1.Thread-0 status is RUNNABLE 2.Thread-0 status is RUNNABLE Thread-0 number = 9 Thread-0 status is TERMINATED Process finished with exit code 0
从上面的结果可以看出,调用Thread t = new Thread(threadStatus);时,线程的状态是NEW,调用start()方法后,线程状态变成RUNNABLE状态,调用sleep()进入TIMED_WAITING状态,线程执行结束后变成TERMINATED状态。
3.2. 一个问题 执行结果的第6行的状态是BLOCKED,但是这个程序中并没有使用锁,这里的BLOCKED状态不知道从何处来的
4. synchronized锁 访问临界资源的时候需要上锁,保证同一时间只能有一个线程访问临界资源。
synchronized的底层实现
JDK早期版本 重量级 – 向OS申请锁
后来的改进:
锁升级的概念: 我就是厕所所长
sync (Object)
markword 记录这个线程ID(偏向锁)
如果线程争用: 升级为 自旋锁
10次以后,
升级为重量级锁 - os
执行时间短(加锁代码),线程数少,用自旋
执行时间长,线程数多,用系统锁
4.1. synchronized 完整代码
synchronized对某个对象加锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class SynchronizedTest { private int count = 10 ; private Object o = new Object(); public synchronized void m () { count--; System.out.println(Thread.currentThread().getName() + "count = " + count); } public void m1 () { synchronized (o) { count--; System.out.println(Thread.currentThread().getName() + "count = " + count); } } public static void main (String[] args) { SynchronizedTest s = new SynchronizedTest(); Runnable a = new Runnable() { @Override public void run () { s.run(); } }; Thread t = new Thread(a); Thread t1 = new Thread(a); t.start(); t1.start(); } }
m()函数和下面注释中的写法是等价的,m1()是synchronized锁的另一种用法,对某一个对象加锁。
4.2. 同步和非同步方法是否可以同时调用 完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package multithread;public class SynchronizedTest { public synchronized void m2 () { System.out.println(Thread.currentThread().getName() + " m2 start ... " ); try { Thread.sleep(10000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " m2 end " ); } public void m3 () { try { Thread.sleep(5000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " m3 " ); } public static void main (String[] args) { SynchronizedTest s = new SynchronizedTest(); Runnable b = new Runnable() { @Override public void run () { s.m2(); } }; Runnable b2 = new Runnable() { @Override public void run () { s.m2(); } }; new Thread(b).start(); new Thread(b2).start(); } }
执行结果
1 2 3 4 Thread-2 m2 start ... Thread-2 m2 end Thread-3 m2 start ... Thread-3 m2 end
修改main函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static void main (String[] args) { SynchronizedTest s = new SynchronizedTest(); Runnable b = new Runnable() { @Override public void run () { s.m2(); } }; Runnable b2 = new Runnable() { @Override public void run () { s.m3(); } }; new Thread(b).start(); new Thread(b2).start(); }
执行结果
1 2 3 Thread-2 m2 start ... Thread-3 m3 Thread-2 m2 end
证明:同步和非同步方法是可以同时调用的
4.3. 同步方法调用另一个同步方法可以吗?(重入性) 完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package multithread;import java.util.concurrent.Callable;public class ReEnterTest { public synchronized void m1 () { System.out.println(Thread.currentThread().getName() + " m1 start " ); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } m2(); System.out.println(Thread.currentThread().getName() + " m1 end" ); } public synchronized void m2 () { System.out.println(Thread.currentThread().getName() + " m2 start" ); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " m2 end" ); } public static void main (String[] args) { ReEnterTest r = new ReEnterTest(); Runnable c = new Runnable() { @Override public void run () { r.m1(); } }; new Thread(c).start(); new Thread(c).start(); } }
执行结果:
1 2 3 4 5 6 7 8 Thread-0 m1 start Thread-0 m2 start Thread-0 m2 end Thread-0 m1 end Thread-1 m1 start Thread-1 m2 start Thread-1 m2 end Thread-1 m1 end
什么是可重入?
一个线程已经拥有某个对象的锁,再次申请时仍然会得到该对象的锁,叫做可重入。
为什么需要是可重入的?
因为如果不可重入,会发生死锁。
4.4. 出现异常,默认锁会释放 程序在执行过程中,如果出现异常,默认情况下锁会被释放,所以在并发处理过程中,有异常要多加小心,不然可能会发生不一致的情况。
比如,在一个web app处理过程中,多个servlet线程共同访问同一个资源,这时如果异常处理不合适,在第一个线程中抛出异常,其他线程会进入同步代码区,有可能会访问到异常时产生的数据,因此要非常小心的处理同步业务逻辑中的异常
完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package multithread;public class ErrorTest { int count = 0 ; public synchronized void m () { System.out.println(Thread.currentThread().getName() + " start " ); while (true ) { count ++; System.out.println(Thread.currentThread().getName() + " count = " + count); try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } if (count == 5 ) { int i = 1 /0 ; System.out.println(i); } } } public static void main (String[] args) { ErrorTest e = new ErrorTest(); Runnable r = new Runnable() { @Override public void run () { e.m(); } }; new Thread(r).start(); new Thread(r).start(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Thread-0 start Thread-0 count = 1 Thread-0 count = 2 Thread-0 count = 3 Thread-0 count = 4 Thread-0 count = 5 Thread-1 start Exception in thread "Thread-0" Thread-1 count = 6 java.lang.ArithmeticException: / by zero at multithread.ErrorTest.m(ErrorTest.java:17) at multithread.ErrorTest$1.run(ErrorTest.java:28) at java.lang.Thread.run(Thread.java:748) Thread-1 count = 7 Thread-1 count = 8 Thread-1 count = 9 Thread-1 count = 10 Thread-1 count = 11 Thread-1 count = 12
如果不产生异常thread-1是没有机会执行的,产生异常后,thread-1就得到了执行。
4.5. 优化 锁的细化,锁住更小的代码块
有时可能也需要锁的粗化,如果需要加的锁特别多,那么也可以在较大的代码块上直接加锁
4.6. 对象做锁时,禁止改变对象 锁定某个对象o,如果o的属性发生改变,不影响锁的使用;但是如果o变成另外一个对象,则锁定的对象发生改变,应该避免将锁定对象的引用变成另外的对象
完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package multithread;public class ObjectNoChange { Object o = new Object(); void m () { synchronized (o) { while (true ) { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); } } } public static void main (String[] args) throws InterruptedException { ObjectNoChange o = new ObjectNoChange(); new Thread(o::m).start(); Thread.sleep(3000 ); Thread t2 = new Thread(o::m, "t2" ); o.o = new Object(); t2.start(); } }
执行结果
1 2 3 4 5 6 7 8 9 Thread-0 Thread-0 Thread-0 Thread-0 t2 Thread-0 t2 Thread-0 t2
为了防止这种情况,应该为加锁对象加上final关键字
5. volatile 5.1. volatile作用
保证线程可见性
MESI
缓存一致性协议
禁止指令重排序
DCL单例
Double Check Lock
Mgr06.java
5.2. 测试volatile的作用 完整代码
不使用volatile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class VolatileTest { boolean running = true ; public void m () { System.out.println("m start" ); while (running) { } System.out.println("m end" ); } public static void main (String[] args) { VolatileTest v = new VolatileTest(); Runnable r = new Runnable() { @Override public void run () { v.m(); } }; new Thread(r).start(); try { Thread.sleep(1000 ); v.running = false ; Thread.sleep(1000 ); System.out.println(v.running); } catch (InterruptedException e) { e.printStackTrace(); } } }
执行结果
1 2 3 4 m start false Process finished with exit code 130 (interrupted by signal 2: SIGINT)
不使用volatile,程序不能停止,主线程对于running的修改没有及时更新,子线程不能感知
为running添加volatile
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package multithread;public class VolatileTest { volatile boolean running = true ; public void m () { System.out.println("m start" ); while (running) { } System.out.println("m end" ); } public static void main (String[] args) { VolatileTest v = new VolatileTest(); Runnable r = new Runnable() { @Override public void run () { v.m(); } }; new Thread(r).start(); try { Thread.sleep(1000 ); v.running = false ; Thread.sleep(1000 ); System.out.println(v.running); } catch (InterruptedException e) { e.printStackTrace(); } } }
执行结果
5.3. volatile的局限 完整代码
volatile并不能保证多个线程共同修改running变量所带来的不一致问题,也就是说volatile不能替代synchronized
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package multithread;import java.util.ArrayList;import java.util.List;public class VolatileShort { volatile int count = 0 ; void m () { for (int i =0 ; i<10000 ; i++) { count ++; } } public static void main (String[] args) { VolatileShort v = new VolatileShort(); List<Thread> threads = new ArrayList<>(); for (int i =0 ; i<10 ; i++) { threads.add(new Thread(v::m)); } threads.forEach((o)->o.start()); threads.forEach((o)-> { try { o.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(v.count); } }
执行结果
6. CAS锁(无锁优化,自旋,乐观锁) CAS(Compare And Swap)
cas(V, Expected, NewValue)
cas 操作时会将V和E进行比较,如果过程中E发生变化,那么就回滚操作,cas操作是CPU原语支持的,不能被打断
if V==E
V = New
otherwise try again or fail
CPU原语支持
ABA问题
例如,线程1期望对变量a进行+1操作,在这时线程1被挂起,线程2将变量a进行-1操作,后又进行了+1操作,这时线程1得到了调度,那么线程1是不能直到变量曾经发生过变化的。
解决方法:
加version,每一次操作都添加一个版本号
如果是基本类型,ABA的问题是不会有什么影响的;
引用类型:你和女友复合,而期间女友谈了一些恋爱。(虽然不恰当,但比较直观,意思是你大妈已经不是你大妈了)
6.1. 对比synchronized、Atomic、LongAdder的效率 完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.LongAdder;public class SyncAtomicLongadder { static Long count1 = 0L ; static AtomicInteger count2 = new AtomicInteger(); static LongAdder count3 = new LongAdder(); public static void main (String[] args) throws InterruptedException { Thread[] threads = new Thread[1000 ]; long start ,end = 0L ; Object lock = new Object(); for (int i=0 ; i<threads.length; i++) { threads[i] = new Thread(new Runnable() { @Override public void run () { for (int k=0 ; k<100000 ; k++) synchronized (lock) { count1++; } } }); } start = System.currentTimeMillis(); for (Thread t: threads ) { t.start(); } for (Thread t: threads ) { t.join(); } end = System.currentTimeMillis(); System.out.println("sync " + (end - start)); for (int i=0 ; i<threads.length; i++) { threads[i] = new Thread(()->{ for (int k=0 ; k<100000 ; k++) { count2.incrementAndGet(); } }); } start = System.currentTimeMillis(); for (Thread t: threads ) { t.start(); } for (Thread t: threads ) { t.join(); } end = System.currentTimeMillis(); System.out.println("atomic " + (end - start)); for (int i=0 ; i<threads.length; i++) { threads[i] = new Thread(()->{ for (int k=0 ; k<100000 ; k++) { count3.increment(); } }); } start = System.currentTimeMillis(); for (Thread t: threads ) { t.start(); } for (Thread t: threads ) { t.join(); } end = System.currentTimeMillis(); System.out.println("LongAdder " + (end - start)); } }
执行结果
1 2 3 4 5 sync 4438 atomic 2195 LongAdder 377 Process finished with exit code 0
7. Semaphore wiki-信号量
信号量 (英语:semaphore )又称为信号标 ,是一个同步对象,用于保持在0至指定最大值之间的一个计数值。当线程完成一次对该semaphore对象的等待(wait)时,该计数值减一;当线程完成一次对semaphore对象的释放(release)时,计数值加一。当计数值为0,则线程等待该semaphore对象不再能成功直至该semaphore对象变成signaled状态。semaphore对象的计数值大于0,为signaled状态;计数值等于0,为nonsignaled状态.
semaphore对象适用于控制一个仅支持有限个用户的共享资源,是一种不需要使用忙碌等待 (busy waiting)的方法。
信号量的概念是由荷兰 计算机科学家艾兹赫尔·戴克斯特拉 (Edsger W. Dijkstra)发明的,广泛的应用于不同的操作系统 中。在系统中,给予每一个进程 一个信号量,代表每个进程目前的状态,未得到控制权的进程会在特定地方被强迫停下来,等待可以继续进行的信号到来。如果信号量是一个任意的整数,通常被称为计数信号量(Counting semaphore),或一般信号量(general semaphore);如果信号量只有二进制的0或1,称为二进制信号量(binary semaphore)。在linux系统中,二进制信号量(binary semaphore)又称互斥锁 (Mutex)。
计数信号量具备两种操作动作,称为V(signal()
)与P(wait()
)(即部分参考书常称的“PV操作”)。V操作会增加信号标S的数值,P操作会减少它。
信号量通常被用来严格限制访问一个资源的线程的数量。
下面的程序模拟了一个停车场,共有3个停车位,车位满了之后,后面的车辆需要等待车位。
完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 package multithread;import java.util.ArrayList;import java.util.HashMap;import java.util.Map;import java.util.concurrent.Semaphore;public class SemaphoreTest { private static final int Max_AVALIABLE = 3 ; private final Semaphore semaphore = new Semaphore(Max_AVALIABLE); private ArrayList<Map<Object, Integer>> objects = new ArrayList<>(); private boolean [] used = new boolean [Max_AVALIABLE]; public int park (Object o) throws InterruptedException { semaphore.acquire(); return park1(o); } private synchronized int park1 (Object o) { int ret = -1 ; for (int i=0 ; i<Max_AVALIABLE; i++) { if (!used[i]) { used[i] = true ; Map<Object, Integer> parkpos= new HashMap<>(); parkpos.put(o, i); objects.add(parkpos); ret = i; System.out.println(Thread.currentThread().getName() + o + " park in the " + ret); break ; } } return ret; } public synchronized int leave (Object o) { int ret = leave1(o); if (ret >= 0 ) { semaphore.release(); } return ret; } private synchronized int leave1 (Object o) { int ret = -1 ; for (Map m: objects ) { if (m.containsKey(o)) { Object v = m.get(o); if (v instanceof Integer) { used[(Integer) v] = false ; ret = (Integer) v; Map<Object, Integer> parkpos= new HashMap<>(); parkpos.put(o, ret); objects.remove(parkpos); System.out.println(Thread.currentThread().getName() + o + " leave the park " + ret); break ; } } } return ret; } public static void main (String[] args) { SemaphoreTest s = new SemaphoreTest(); ArrayList<Car> cars = new ArrayList<>(); for (int i=0 ; i<10 ; i++) { Car c = new Car(i, s); cars.add(c); } for (Car car: cars ) { new Thread(car).start(); } } } class Car implements Runnable { private int carNumber; private SemaphoreTest s; public Car (int carNumber, SemaphoreTest s) { this .carNumber = carNumber; this .s = s; } public int getCarNumber () { return carNumber; } @Override public void run () { try { s.park(this ); Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } s.leave(this ); } @Override public String toString () { return " Car{" + "carNumber=" + carNumber + '}' ; } }
执行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Thread-0 Car{carNumber=0} park in the 0 Thread-2 Car{carNumber=2} park in the 1 Thread-1 Car{carNumber=1} park in the 2 Thread-0 Car{carNumber=0} leave the park 0 Thread-2 Car{carNumber=2} leave the park 1 Thread-4 Car{carNumber=4} park in the 0 Thread-1 Car{carNumber=1} leave the park 2 Thread-3 Car{carNumber=3} park in the 1 Thread-5 Car{carNumber=5} park in the 2 Thread-4 Car{carNumber=4} leave the park 0 Thread-6 Car{carNumber=6} park in the 0 Thread-3 Car{carNumber=3} leave the park 1 Thread-5 Car{carNumber=5} leave the park 2 Thread-7 Car{carNumber=7} park in the 1 Thread-8 Car{carNumber=8} park in the 2 Thread-6 Car{carNumber=6} leave the park 0 Thread-9 Car{carNumber=9} park in the 0 Thread-7 Car{carNumber=7} leave the park 1 Thread-8 Car{carNumber=8} leave the park 2 Thread-9 Car{carNumber=9} leave the park 0 Process finished with exit code 0
8. Lock 通常,锁提供对共享资源的独占访问:一次只能有一个线程可以获取该锁,并且对共享资源的所有访问都需要首先获取该锁。 但是,某些锁可能允许并发访问共享资源,例如ReadWriteLock的读取锁。这是synchronized办不到的。
java.util.concurrent.locks
Interface Lock All Known Implementing Classes:
ReentrantLock , ReentrantReadWriteLock.ReadLock , ReentrantReadWriteLock.WriteLock
8.1. ReentrantLock 完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package multithread;import java.util.ArrayList;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class ReentrantLockTest { private Lock lock = new ReentrantLock(); private int count = 10 ; public void sell () { while (count > 0 ) { if (lock.tryLock()) { try { count--; System.out.println(Thread.currentThread().getName() + " count = " + count); Thread.sleep(40 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } else { System.out.println(Thread.currentThread().getName() + " failed to get lock " ); } } } public static void main (String[] args) { ArrayList<Thread> threads = new ArrayList<>(); ReentrantLockTest r = new ReentrantLockTest(); for (int i=0 ; i<3 ; i++) { threads.add(new Thread(new Runnable() { @Override public void run () { r.sell(); } })); } for (Thread t: threads ) { t.start(); } } }
8.2. ReadWriteLock 读-读能共存,读-写不能共存,写-写不能共存
完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 import java.util.Random;import java.util.concurrent.atomic.LongAdder;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantLock;import java.util.concurrent.locks.ReentrantReadWriteLock;public class ReadWriteLockTest { static Lock lock = new ReentrantLock(); private static int value; static ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); static Lock readLock = readWriteLock.readLock(); static Lock writeLock = readWriteLock.writeLock(); public static void read (Lock lock) { try { lock.lock(); Thread.sleep(1000 ); System.out.println("read over" ); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void write (Lock lock, int v) { try { lock.lock(); Thread.sleep(1000 ); value = v; System.out.println("write over" ); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main (String[] args) { Runnable readR = ()->read(readLock); Runnable writeR = ()->write(writeLock, new Random().nextInt()); for (int i=0 ; i<18 ; i++) new Thread(readR).start(); for (int i=0 ; i<2 ; i++) new Thread(writeR).start(); } }
9. 线程栅栏 有时需要多个线程都完成某项任务后才能进行下一步工作,因此需要线程栅栏。
CountDownLatch
CountDownLatch
这个类的作用是等待一些线程全部完成工作后再进行下一项工作
CyclicBarrier
CyclicBarrier
Phaser
Phaser
功能上与CountDownLatch和CyclicBarrier类似,但支持更灵活的用法
10. ThreadLoacl ThreadLocal 是某一个线程自身拥有的资源,其他线程不能访问。
每一个线程都拥有一个ThreadLocalMap的map,set()函数会先获取这个map,然后再向map中添加数据,map中的key是ThreadLocal的弱引用。
为什么Entry要使用弱引用?
若是强引用,即使tl=null,但key的引用仍然指向ThreadLocal对象,所以会有内存泄露,而使用弱引用则不会。
但是还有内存泄漏的风险,ThreadLocal被回收,key的值变为null,则导致value再也无法被访问,因此不再使用里面的值之后要使用remove函数删掉map里的值。
10.1. 源码实现 ThreadLocal.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 static class Entry extends WeakReference <ThreadLocal <?>> { Object value; Entry(ThreadLocal<?> k, Object v) { super (k); value = v; } } ThreadLocalMap getMap (Thread t) { return t.threadLocals; } public void set (T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null ) map.set(this , value); else createMap(t, value); }
Thread.java
1 ThreadLocal.ThreadLocalMap threadLocals = null ;
Thread类拥有一个ThreadLocalMap类型的变量,每一个线程都有一个map
10.2. 代码测试 完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package multithread;public class ThreadLoaclTest { public static void main (String[] args) { ThreadLocal<Person> tl = new ThreadLocal<Person>(); new Thread(()-> { try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + tl.get()); }).start(); new Thread(()-> { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } tl.set(new Person("zhangSan" )); }).start(); new Thread(()-> { try { Thread.sleep(2000 ); } catch (InterruptedException e) { e.printStackTrace(); } tl.set(new Person("liSi" )); System.out.println(Thread.currentThread().getName() + tl.get()); }).start(); } } class Person { private String name; Person(String name) { this .name = name; } @Override public String toString () { return "Person{" + "name='" + name + '\'' + '}' ; } }
执行结果
1 2 3 4 Thread-0null Thread-2Person{name='liSi'} Process finished with exit code 0
11. 多线程经典问题 11.1. 生产者消费者问题 完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 package multithread;import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.Random;public class ProducerConsumer1 { public static void main (String[] args) { WareHouse wareHouse = new WareHouse(); ArrayList<Thread> threads = new ArrayList<>(); for (int i=0 ; i<10 ; i++) { threads.add(new Thread(new Producer(wareHouse))); } for (int i=0 ; i<2 ; i++) { threads.add(new Thread(new Consumer(wareHouse))); } for (Thread t: threads ) { t.start(); } } } class WareHouse { volatile List<Integer> wareHouse = Collections.synchronizedList(new ArrayList<>()); final int Max_length = 10 ; synchronized void produce () { while (wareHouse.size() == Max_length) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } int a = new Random().nextInt(30 ); wareHouse.add(a); notifyAll(); System.out.println(Thread.currentThread().getName() + " produce " + a + " wareHouse size is " + wareHouse.size()); } synchronized void consume () { while (wareHouse.size() == 0 ) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } int a = wareHouse.remove(0 ); notifyAll(); System.out.println(Thread.currentThread().getName() + " get " + a + " wareHouse size is " + wareHouse.size()); } synchronized int getSize () { return wareHouse.size(); } } class Producer implements Runnable { private WareHouse wareHouse; Producer(WareHouse wareHouse) { this .wareHouse = wareHouse; } @Override public void run () { while (true ) { wareHouse.produce(); } } } class Consumer implements Runnable { private WareHouse wareHouse; Consumer(WareHouse wareHouse) { this .wareHouse = wareHouse; } @Override public void run () { while (true ) { wareHouse.consume(); } } }
执行结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 . . . Thread-6 get 28 wareHouse size is 9 Thread-6 get 19 wareHouse size is 9 Thread-6 get 9 wareHouse size is 9 Thread-5 get 12 wareHouse size is 9 Thread-2 produce 0 wareHouse size is 9 Thread-5 get 3 wareHouse size is 8 Thread-4 produce 22 wareHouse size is 10 Thread-0 produce 2 wareHouse size is 10 Thread-6 get 3 wareHouse size is 8 Thread-6 get 28 wareHouse size is 9 Thread-6 get 28 wareHouse size is 9 . . .