8.共享模型之工具 8.1 线程池 8.1.1 自定义线程池
步骤1:自定义拒绝策略接口
1 2 3 4 @FunctionalInterface interface RejectPolicy <T>{ void reject (BlockingQueue<T> queue,T task) ; }
步骤2:自定义任务队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 class BlockingQueue <T>{ private Deque<T> queue = new ArrayDeque <>(); private int capacity; private ReentrantLock lock = new ReentrantLock (); private Condition fullWaitSet = lock.newCondition(); private Condition emptyWaitSet = lock.newCondition(); public BlockingQueue (int capacity) { this .capacity = capacity; } public T poll (long timeout, TimeUnit unit) { lock.lock(); long nanoTime = unit.toNanos(timeout); try { while (queue.size() == 0 ){ try { if (nanoTime <= 0 ){ return null ; } nanoTime = emptyWaitSet.awaitNanos(nanoTime); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.pollFirst(); fullWaitSet.signal(); return t; }finally { lock.unlock(); } } public T take () { lock.lock(); try { while (queue.size() == 0 ){ try { emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.pollFirst(); fullWaitSet.signal(); return t; }finally { lock.unlock(); } } public void put (T t) { lock.lock(); try { while (queue.size() == capacity){ try { System.out.println(Thread.currentThread().toString() + "等待加入任务队列:" + t.toString()); fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().toString() + "加入任务队列:" + t.toString()); queue.addLast(t); emptyWaitSet.signal(); }finally { lock.unlock(); } } public boolean offer (T t,long timeout,TimeUnit timeUnit) { lock.lock(); try { long nanoTime = timeUnit.toNanos(timeout); while (queue.size() == capacity){ try { if (nanoTime <= 0 ){ System.out.println("等待超时,加入失败:" + t); return false ; } System.out.println(Thread.currentThread().toString() + "等待加入任务队列:" + t.toString()); nanoTime = fullWaitSet.awaitNanos(nanoTime); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().toString() + "加入任务队列:" + t.toString()); queue.addLast(t); emptyWaitSet.signal(); return true ; }finally { lock.unlock(); } } public int size () { lock.lock(); try { return queue.size(); }finally { lock.unlock(); } } public void tryPut (RejectPolicy<T> rejectPolicy,T task) { lock.lock(); try { if (queue.size() == capacity){ rejectPolicy.reject(this ,task); }else { System.out.println("加入任务队列:" + task); queue.addLast(task); emptyWaitSet.signal(); } }finally { lock.unlock(); } } }
步骤3:自定义线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 class ThreadPool { BlockingQueue<Runnable> taskQue; HashSet<Worker> workers = new HashSet <>(); private RejectPolicy<Runnable> rejectPolicy; public ThreadPool (int coreSize,long timeout,TimeUnit timeUnit,int queueCapacity,RejectPolicy<Runnable> rejectPolicy) { this .coreSize = coreSize; this .timeout = timeout; this .timeUnit = timeUnit; this .rejectPolicy = rejectPolicy; taskQue = new BlockingQueue <Runnable>(queueCapacity); } private int coreSize; private long timeout; private TimeUnit timeUnit; public void execute (Runnable task) { synchronized (workers){ if (workers.size() >= coreSize){ taskQue.tryPut(rejectPolicy,task); }else { Worker worker = new Worker (task); System.out.println(Thread.currentThread().toString() + "新增worker:" + worker + ",task:" + task); workers.add(worker); worker.start(); } } } class Worker extends Thread { private Runnable task; public Worker (Runnable task) { this .task = task; } @Override public void run () { while (task != null || (task = taskQue.poll(timeout,timeUnit)) != null ){ try { System.out.println(Thread.currentThread().toString() + "正在执行:" + task); task.run(); }catch (Exception e){ }finally { task = null ; } } synchronized (workers){ System.out.println(Thread.currentThread().toString() + "worker被移除:" + this .toString()); workers.remove(this ); } } } }
步骤4:编写测试类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class ThreadPoolTest { public static void main (String[] args) { ThreadPool threadPool = new ThreadPool (1 , 1000 , TimeUnit.MILLISECONDS, 1 , (queue,task)->{ task.run(); }); for (int i = 0 ; i <3 ; i++) { int j = i; threadPool.execute(()->{ try { System.out.println(Thread.currentThread().toString() + "执行任务:" + j); Thread.sleep(1000L ); } catch (InterruptedException e) { e.printStackTrace(); } }); } } }
8.1.2 ThreadPoolExecutor
说明:
ScheduledThreadPoolExecutor是带调度的线程池 ThreadPoolExecutor是不带调度的线程池 1)线程池状态 ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
RUNNING 111 Y Y SHUTDOWN 000 N Y 不会接收新任务,但会处理阻塞队列剩余 任务 STOP 001 N N 会中断正在执行的任务,并抛弃阻塞队列 任务 TIDYING 010 任务全执行完毕,活动线程为 0 即将进入 终结 TERMINATED 011 终结状态
从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用 一次 cas 原子操作 进行赋值
1 2 3 4 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));private static int ctlOf (int rs, int wc) { return rs | wc; }
2)构造方法 1 2 3 4 5 6 7 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize 核心线程数目 (最多保留的线程数) maximumPoolSize 最大线程数目 keepAliveTime 生存时间 - 针对救急线程 unit 时间单位 - 针对救急线程 workQueue 阻塞队列 threadFactory 线程工厂 - 可以为线程创建时起个好名字 handler 拒绝策略 工作方式
线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排 队,直到有空闲的线程。
如果队列选择了有界队列 ,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。
如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略 。拒绝策略 jdk 提供了 4 种实现,其它 著名框架也提供了实现
AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略 CallerRunsPolicy 让调用者运行任务 DiscardPolicy 放弃本次任务 DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之 Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方 便定位问题 Netty 的实现,是创建一个新线程来执行任务 ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略 PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略 当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由 keepAliveTime 和 unit 来控制。
根据这个构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池。
3)newFixedThreadPool固定大小线程池 1 2 3 4 5 public static ExecutorService newFixedThreadPool (int nThreads) { return new ThreadPoolExecutor (nThreads, nThreads, 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>()); }
特点
核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间 阻塞队列是无界的,可以放任意数量的任务 评价 适用于任务量已知,相对耗时的任务
内部调用了:ThreadPoolExecutor的一个构造方法
1 2 3 4 5 6 7 8 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this (corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
默认工厂以及默认构造线程的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null ) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-" ; }public Thread newThread (Runnable r) { Thread t = new Thread (group, r, namePrefix + threadNumber.getAndIncrement(), 0 ); if (t.isDaemon()) t.setDaemon(false ); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; }
默认拒绝策略:抛出异常
1 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy ();
4)newCachedThreadPool带缓冲线程池 1 2 3 4 5 public static ExecutorService newCachedThreadPool () { return new ThreadPoolExecutor (0 , Integer.MAX_VALUE, 60L , TimeUnit.SECONDS, new SynchronousQueue <Runnable>()); }
特点
核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着全部都是救急线程(60s 后可以回收) 救急线程可以无限创建 队列采用了 SynchronousQueue
实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货) 评价 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线 程。
适合任务数比较密集,但每个任务执行时间较短的情况
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 SynchronousQueue<Integer> integers = new SynchronousQueue <>();new Thread (() -> { try { log.debug("putting {} " , 1 ); integers.put(1 ); log.debug("{} putted..." , 1 ); log.debug("putting...{} " , 2 ); integers.put(2 ); log.debug("{} putted..." , 2 ); } catch (InterruptedException e) { e.printStackTrace(); } },"t1" ).start(); sleep(1 );new Thread (() -> { try { log.debug("taking {}" , 1 ); integers.take(); } catch (InterruptedException e) { e.printStackTrace(); } },"t2" ).start(); sleep(1 );new Thread (() -> { try { log.debug("taking {}" , 2 ); integers.take(); } catch (InterruptedException e) { e.printStackTrace(); } },"t3" ).start();
输出
1 2 3 4 5 6 11:48:15.500 c.TestSynchronousQueue [t1] - putting 1 11:48:16.500 c.TestSynchronousQueue [t2] - taking 1 11:48:16.500 c.TestSynchronousQueue [t1] - 1 putted... 11:48:16.500 c.TestSynchronousQueue [t1] - putting...2 11:48:17.502 c.TestSynchronousQueue [t3] - taking 2 11:48:17.503 c.TestSynchronousQueue [t1] - 2 putted...
5)newSingleThreadExecutor单线程线程池 1 2 3 4 5 6 public static ExecutorService newSingleThreadExecutor () { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor (1 , 1 , 0L , TimeUnit.MILLISECONDS, new LinkedBlockingQueue <Runnable>())); }
使用场景:
希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
区别:
自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一 个线程,保证池的正常工作 Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改FinalizableDelegatedExecutorService 应用的是装饰器模式,在调用构造方法时将ThreadPoolExecutor对象传给了内部的ExecutorService接口。只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法,也不能重新设置线程池的大小。 Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改 6)提交任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 void execute (Runnable command) ; <T> Future<T> submit (Callable<T> task) ; <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny (Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
测试submit
1 2 3 4 5 6 7 8 9 10 11 12 13 private static void method1 (ExecutorService pool) throws InterruptedException, ExecutionException { Future<String> future = pool.submit(() -> { log.debug("running" ); Thread.sleep(1000 ); return "ok" ; }); log.debug("{}" , future.get()); }public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(1 ); method1(pool); }
测试结果
1 2 18:36:58.033 c.TestSubmit [pool-1-thread-1] - running 18:36:59.034 c.TestSubmit [main] - ok
测试invokeAll
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private static void method2 (ExecutorService pool) throws InterruptedException { List<Future<String>> futures = pool.invokeAll(Arrays.asList( () -> { log.debug("begin" ); Thread.sleep(1000 ); return "1" ; }, () -> { log.debug("begin" ); Thread.sleep(500 ); return "2" ; }, () -> { log.debug("begin" ); Thread.sleep(2000 ); return "3" ; } )); futures.forEach( f -> { try { log.debug("{}" , f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); }public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(1 ); method2(pool); }
测试结果
1 2 3 4 5 6 19 :33 :16.530 c.TestSubmit [pool-1 -thread-1 ] - begin19 :33 :17.530 c.TestSubmit [pool-1 -thread-1 ] - begin19 :33 :18.040 c.TestSubmit [pool-1 -thread-1 ] - begin19 :33 :20.051 c.TestSubmit [main] - 1 19 :33 :20.051 c.TestSubmit [main] - 2 19 :33 :20.051 c.TestSubmit [main] - 3
测试invokeAny
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private static void method3 (ExecutorService pool) throws InterruptedException, ExecutionException { String result = pool.invokeAny(Arrays.asList( () -> { log.debug("begin 1" ); Thread.sleep(1000 ); log.debug("end 1" ); return "1" ; }, () -> { log.debug("begin 2" ); Thread.sleep(500 ); log.debug("end 2" ); return "2" ; }, () -> { log.debug("begin 3" ); Thread.sleep(2000 ); log.debug("end 3" ); return "3" ; } )); log.debug("{}" , result); }public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(3 ); method3(pool); }
测试结果
1 2 3 4 5 6 7 8 9 10 19:44:46.314 c.TestSubmit [pool-1-thread-1] - begin 1 19:44:46.314 c.TestSubmit [pool-1-thread-3] - begin 3 19:44:46.314 c.TestSubmit [pool-1-thread-2] - begin 2 19:44:46.817 c.TestSubmit [pool-1-thread-2] - end 2 19:44:46.817 c.TestSubmit [main] - 2 19:47:16.063 c.TestSubmit [pool-1-thread-1] - begin 1 19:47:17.063 c.TestSubmit [pool-1-thread-1] - end 1 19:47:17.063 c.TestSubmit [pool-1-thread-1] - begin 2 19:47:17.063 c.TestSubmit [main] - 1
7)关闭线程池 shutdown
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void shutdown () { final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
shutdownNow
1 2 3 4 5 6 7 List<Runnable> shutdownNow () ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public List<Runnable> shutdownNow () { List<Runnable> tasks; final ReentrantLock mainLock = this .mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
其他方法
1 2 3 4 5 6 7 boolean isShutdown () ;boolean isTerminated () ;boolean awaitTermination (long timeout, TimeUnit unit) throws InterruptedException;
测试shutdown、shutdownNow、awaitTermination
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @Slf4j(topic = "c.TestShutDown") public class TestShutDown { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService pool = Executors.newFixedThreadPool(2 ); Future<Integer> result1 = pool.submit(() -> { log.debug("task 1 running..." ); Thread.sleep(1000 ); log.debug("task 1 finish..." ); return 1 ; }); Future<Integer> result2 = pool.submit(() -> { log.debug("task 2 running..." ); Thread.sleep(1000 ); log.debug("task 2 finish..." ); return 2 ; }); Future<Integer> result3 = pool.submit(() -> { log.debug("task 3 running..." ); Thread.sleep(1000 ); log.debug("task 3 finish..." ); return 3 ; }); log.debug("shutdown" ); pool.shutdown(); } }
测试结果
1 2 3 4 5 6 7 8 9 10 11 12 13 20:09:13.285 c.TestShutDown [main] - shutdown 20:09:13.285 c.TestShutDown [pool-1-thread-1] - task 1 running... 20:09:13.285 c.TestShutDown [pool-1-thread-2] - task 2 running... 20:09:14.293 c.TestShutDown [pool-1-thread-2] - task 2 finish... 20:09:14.293 c.TestShutDown [pool-1-thread-1] - task 1 finish... 20:09:14.293 c.TestShutDown [pool-1-thread-2] - task 3 running... 20:09:15.303 c.TestShutDown [pool-1-thread-2] - task 3 finish... 20:11:11.750 c.TestShutDown [main] - shutdown 20:11:11.750 c.TestShutDown [pool-1-thread-1] - task 1 running... 20:11:11.750 c.TestShutDown [pool-1-thread-2] - task 2 running... 20:11:11.750 c.TestShutDown [main] - other.... [java.util.concurrent.FutureTask@66d33a]
8)*模式之工作模式Worker Thread 定义
让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。 也可以将其归类为分工模式,它的典型实现 就是线程池,也体现了经典设计模式中的享元模式。
例如,海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那 么成本就太高了(对比另一种多线程设计模式:Thread-Per-Message)
注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率
例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成 服务员(线程池A)与厨师(线程池B)更为合理,当然你能想到更细致的分工
饥饿现象
固定大小线程池会有饥饿现象
两个工人是同一个线程池中的两个线程
他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作
客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待 后厨做菜:没啥说的,做就是了 比如工人A 处理了点餐任务,接下来它要等着 工人B 把菜做好,然后上菜,他俩也配合的蛮好
但现在同时来了两个客人,这个时候工人A 和工人B 都去处理点餐了,这时没人做饭了,饥饿
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class TestDeadLock { static final List<String> MENU = Arrays.asList("地三鲜" , "宫保鸡丁" , "辣子鸡丁" , "烤鸡翅" ); static Random RANDOM = new Random (); static String cooking () { return MENU.get(RANDOM.nextInt(MENU.size())); } public static void main (String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(2 ); executorService.execute(() -> { log.debug("处理点餐..." ); Future<String> f = executorService.submit(() -> { log.debug("做菜" ); return cooking(); }); try { log.debug("上菜: {}" , f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); } }
输出
1 2 3 17:21:27.883 c.TestDeadLock [pool-1-thread-1] - 处理点餐... 17:21:27.891 c.TestDeadLock [pool-1-thread-2] - 做菜 17:21:27.891 c.TestDeadLock [pool-1-thread-1] - 上菜: 烤鸡翅
当注释取消后,可能的输出
1 2 17:08:41.339 c.TestDeadLock [pool-1-thread-2] - 处理点餐... 17:08:41.339 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
解决方法可以增加线程池的大小,不过不是根本解决方案,还是前面提到的,不同的任务类型,采用不同的线程 池 ,例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class TestDeadLock { static final List<String> MENU = Arrays.asList("地三鲜" , "宫保鸡丁" , "辣子鸡丁" , "烤鸡翅" ); static Random RANDOM = new Random (); static String cooking () { return MENU.get(RANDOM.nextInt(MENU.size())); } public static void main (String[] args) { ExecutorService waiterPool = Executors.newFixedThreadPool(1 ); ExecutorService cookPool = Executors.newFixedThreadPool(1 ); waiterPool.execute(() -> { log.debug("处理点餐..." ); Future<String> f = cookPool.submit(() -> { log.debug("做菜" ); return cooking(); }); try { log.debug("上菜: {}" , f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); waiterPool.execute(() -> { log.debug("处理点餐..." ); Future<String> f = cookPool.submit(() -> { log.debug("做菜" ); return cooking(); }); try { log.debug("上菜: {}" , f.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); } }
输出
1 2 3 4 5 6 17:25:14.626 c.TestDeadLock [pool-1-thread-1] - 处理点餐... 17:25:14.630 c.TestDeadLock [pool-2-thread-1] - 做菜 17:25:14.631 c.TestDeadLock [pool-1-thread-1] - 上菜: 地三鲜 17:25:14.632 c.TestDeadLock [pool-1-thread-1] - 处理点餐... 17:25:14.632 c.TestDeadLock [pool-2-thread-1] - 做菜 17:25:14.632 c.TestDeadLock [pool-1-thread-1] - 上菜: 辣子鸡丁
创建多少线程池合适
过小会导致程序不能充分地利用系统资源、容易导致饥饿 过大会导致更多的线程上下文切换,占用更多内存 CPU 密集型运算
通常采用 cpu 核数 + 1
能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因 导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费
I/O 密集型运算
CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程 RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。
经验公式如下
线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间
例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式
4 * 100% * 100% / 50% = 8
例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式
4 * 100% * 100% / 10% = 40
9)任务调度线程池 在『任务调度线程池』功能加入之前(JDK1.3),可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但 由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个 任务的延迟或异常都将会影响到之后的任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static void main (String[] args) { Timer timer = new Timer (); TimerTask task1 = new TimerTask () { @Override public void run () { log.debug("task 1" ); sleep(2 ); } }; TimerTask task2 = new TimerTask () { @Override public void run () { log.debug("task 2" ); } }; timer.schedule(task1, 1000 ); timer.schedule(task2, 1000 ); }
输出
1 2 3 20:46:09.444 c.TestTimer [main] - start... 20:46:10.447 c.TestTimer [Timer-0] - task 1 20:46:12.448 c.TestTimer [Timer-0] - task 2
使用 ScheduledExecutorService 改写:
1 2 3 4 5 6 7 8 9 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2 ); executor.schedule(() -> { System.out.println("任务1,执行时间:" + new Date ()); try { Thread.sleep(2000 ); } catch (InterruptedException e) { } }, 1000 , TimeUnit.MILLISECONDS); executor.schedule(() -> { System.out.println("任务2,执行时间:" + new Date ()); }, 1000 , TimeUnit.MILLISECONDS);
输出
1 2 任务1,执行时间:Thu Jan 03 12:45:17 CST 2019 任务2,执行时间:Thu Jan 03 12:45:17 CST 2019
scheduleAtFixedRate 例子:
1 2 3 4 5 ScheduledExecutorService pool = Executors.newScheduledThreadPool(1 ); log.debug("start..." ); pool.scheduleAtFixedRate(() -> { log.debug("running..." ); }, 1 , 1 , TimeUnit.SECONDS);
输出
1 2 3 4 5 21:45:43.167 c.TestTimer [main] - start... 21:45:44.215 c.TestTimer [pool-1-thread-1] - running... 21:45:45.215 c.TestTimer [pool-1-thread-1] - running... 21:45:46.215 c.TestTimer [pool-1-thread-1] - running... 21:45:47.215 c.TestTimer [pool-1-thread-1] - running...
scheduleAtFixedRate 例子(任务执行时间超过了间隔时间):
1 2 3 4 5 6 ScheduledExecutorService pool = Executors.newScheduledThreadPool(1 ); log.debug("start..." ); pool.scheduleAtFixedRate(() -> { log.debug("running..." ); sleep(2 ); }, 1 , 1 , TimeUnit.SECONDS);
输出分析:一开始,延时 1s,接下来,由于任务执行时间 > 间隔时间,间隔被『撑』到了 2s
1 2 3 4 5 21:44:30.311 c.TestTimer [main] - start... 21:44:31.360 c.TestTimer [pool-1-thread-1] - running... 21:44:33.361 c.TestTimer [pool-1-thread-1] - running... 21:44:35.362 c.TestTimer [pool-1-thread-1] - running... 21:44:37.362 c.TestTimer [pool-1-thread-1] - running...
scheduleWithFixedDelay 例子:
1 2 3 4 5 6 ScheduledExecutorService pool = Executors.newScheduledThreadPool(1 ); log.debug("start..." ); pool.scheduleWithFixedDelay(()-> { log.debug("running..." ); sleep(2 ); }, 1 , 1 , TimeUnit.SECONDS);
输出分析:一开始,延时 1s,scheduleWithFixedDelay 的间隔是 上一个任务结束 <-> 延时 <-> 下一个任务开始 所 以间隔都是 3s
1 2 3 4 5 21:40:55.078 c.TestTimer [main] - start... 21:40:56.140 c.TestTimer [pool-1-thread-1] - running... 21:40:59.143 c.TestTimer [pool-1-thread-1] - running... 21:41:02.145 c.TestTimer [pool-1-thread-1] - running... 21:41:05.147 c.TestTimer [pool-1-thread-1] - running...
评价 整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线 程也不会被释放。用来执行延迟或反复执行的任务
10)正确处理执行任务异常 不论是哪个线程池,在线程执行的任务发生异常后既不会抛出,也不会捕获,这时就需要我们做一定的处理。
方法1:主动捉异常
1 2 3 4 5 6 7 8 9 ExecutorService pool = Executors.newFixedThreadPool(1 ); pool.submit(() -> { try { log.debug("task1" ); int i = 1 / 0 ; } catch (Exception e) { log.error("error:" , e); } });
输出
1 2 3 4 5 6 7 8 9 21:59:04.558 c.TestTimer [pool-1-thread-1] - task1 21:59:04.562 c.TestTimer [pool-1-thread-1] - error: java.lang.ArithmeticException: / by zero at cn.itcast.n8.TestTimer.lambda$main$0 (TestTimer.java:28) at java.util.concurrent.Executors$RunnableAdapter .call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker .run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
方法2:使用 Future
说明:
lambda表达式内要有返回值,编译器才能将其识别为Callable,否则将识别为Runnable,也就不能用FutureTask 方法中如果出异常,futuretask.get
会返回这个异常,否者正常返回。 1 2 3 4 5 6 7 ExecutorService pool = Executors.newFixedThreadPool(1 ); Future<Boolean> f = pool.submit(() -> { log.debug("task1" ); int i = 1 / 0 ; return true ; }); log.debug("result:{}" , f.get());
输出
1 2 3 4 5 6 7 8 9 10 11 12 21:54:58.208 c.TestTimer [pool-1-thread-1] - task1 Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at cn.itcast.n8.TestTimer.main(TestTimer.java:31) Caused by: java.lang.ArithmeticException: / by zero at cn.itcast.n8.TestTimer.lambda$main$0 (TestTimer.java:28) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker .run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
11)* 应用之定时任务 如何让每周四 18:00:00 定时执行任务?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 LocalDateTime now = LocalDateTime.now();LocalDateTime thursday = now.with(DayOfWeek.THURSDAY).withHour(18 ).withMinute(0 ).withSecond(0 ).withNano(0 );if (now.compareTo(thursday) >= 0 ) { thursday = thursday.plusWeeks(1 ); }long initialDelay = Duration.between(now, thursday).toMillis();long oneWeek = 7 * 24 * 3600 * 1000 ;ScheduledExecutorService executor = Executors.newScheduledThreadPool(2 ); System.out.println("开始时间:" + new Date ()); executor.scheduleAtFixedRate(() -> { System.out.println("执行时间:" + new Date ()); }, initialDelay, oneWeek, TimeUnit.MILLISECONDS);
12)Tomcat 线程池 Tomcat 在哪里用到了线程池呢
LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲 Acceptor 只负责【接收新的 socket 连接】 Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】 一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理 Executor 线程池中的工作线程最终负责【处理请求】 Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同
如果总线程数达到 maximumPoolSize这时不会立刻抛 RejectedExecutionException 异常 而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常 源码 tomcat-7.0.42
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public void execute (Runnable command, long timeout, TimeUnit unit) { submittedCount.incrementAndGet(); try { super .execute(command); } catch (RejectedExecutionException rx) { if (super .getQueue() instanceof TaskQueue) { final TaskQueue queue = (TaskQueue)super .getQueue(); try { if (!queue.force(command, timeout, unit)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException ("Queue capacity is full." ); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); Thread.interrupted(); throw new RejectedExecutionException (x); } } else { submittedCount.decrementAndGet(); throw rx; } } }
TaskQueue.java
1 2 3 4 5 6 7 8 public boolean force (Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if ( parent.isShutdown() ) throw new RejectedExecutionException ( "Executor not running, can't force a command into the queue" ); return super .offer(o,timeout,unit); is rejected }
Connector 配置
acceptorThreadCount
1 acceptor 线程数量 pollerThreadCount
1 poller 线程数量 minSpareThreads
10 核心线程数,即 corePoolSize maxThreads
200 最大线程数,即 maximumPoolSize executor
- Executor 名称,用来引用下面的 Executor
Executor 线程配置
threadPriority
5 线程优先级 deamon
true 是否守护线程 minSpareThreads
25 核心线程数,即corePoolSize maxThreads
200 最大线程数,即 maximumPoolSize maxIdleTime
60000 线程生存时间,单位是毫秒,默认值即 1 分钟 maxQueueSize
Integer.MAX_VALUE 队列长度 prestartminSpareThreads
false 核心线程是否在服务器启动时启动
8.1.3 Fork/Join 线程池 概念
Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型 运算
所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计 算,如归并排序、斐波那契数列、都可以用分治思想进行求解
Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运 算效率
Fork/Join 默认会创建与 cpu 核心数大小相同的线程池
应用之求和 提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值)
例如下面定义了一个对 1~n 之间的整数求和的任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @Slf4j(topic = "c.AddTask") class AddTask1 extends RecursiveTask <Integer> { int n; public AddTask1 (int n) { this .n = n; } @Override public String toString () { return "{" + n + '}' ; } @Override protected Integer compute () { if (n == 1 ) { log.debug("join() {}" , n); return n; } AddTask1 t1 = new AddTask1 (n - 1 ); t1.fork(); log.debug("fork() {} + {}" , n, t1); int result = n + t1.join(); log.debug("join() {} + {} = {}" , n, t1, result); return result; } }
然后提交给 ForkJoinPool 来执行
1 2 3 4 public static void main (String[] args) { ForkJoinPool pool = new ForkJoinPool (4 ); System.out.println(pool.invoke(new AddTask1 (5 ))); }
结果
1 2 3 4 5 6 7 8 9 10 [ForkJoinPool-1-worker-0] - fork() 2 + {1} [ForkJoinPool-1-worker-1] - fork() 5 + {4} [ForkJoinPool-1-worker-0] - join () 1 [ForkJoinPool-1-worker-0] - join () 2 + {1} = 3 [ForkJoinPool-1-worker-2] - fork() 4 + {3} [ForkJoinPool-1-worker-3] - fork() 3 + {2} [ForkJoinPool-1-worker-3] - join () 3 + {2} = 6 [ForkJoinPool-1-worker-2] - join () 4 + {3} = 10 [ForkJoinPool-1-worker-1] - join () 5 + {4} = 15 15
用图来表示
改进(5要等待其他任务执行完才能继续执行,能不能让任务间的依赖减少,使他们并行执行 )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 class AddTask3 extends RecursiveTask <Integer> { int begin; int end; public AddTask3 (int begin, int end) { this .begin = begin; this .end = end; } @Override public String toString () { return "{" + begin + "," + end + '}' ; } @Override protected Integer compute () { if (begin == end) { log.debug("join() {}" , begin); return begin; } if (end - begin == 1 ) { log.debug("join() {} + {} = {}" , begin, end, end + begin); return end + begin; } int mid = (end + begin) / 2 ; AddTask3 t1 = new AddTask3 (begin, mid); t1.fork(); AddTask3 t2 = new AddTask3 (mid + 1 , end); t2.fork(); log.debug("fork() {} + {} = ?" , t1, t2); int result = t1.join() + t2.join(); log.debug("join() {} + {} = {}" , t1, t2, result); return result; } }
然后提交给 ForkJoinPool 来执行
1 2 3 4 public static void main (String[] args) { ForkJoinPool pool = new ForkJoinPool (4 ); System.out.println(pool.invoke(new AddTask3 (1 , 10 ))); }
结果
1 2 3 4 5 6 7 8 [ForkJoinPool-1-worker-0] - join () 1 + 2 = 3 [ForkJoinPool-1-worker-3] - join () 4 + 5 = 9 [ForkJoinPool-1-worker-0] - join () 3 [ForkJoinPool-1-worker-1] - fork () {1,3} + {4,5} = ? [ForkJoinPool-1-worker-2] - fork () {1,2} + {3,3} = ? [ForkJoinPool-1-worker-2] - join () {1,2} + {3,3} = 6 [ForkJoinPool-1-worker-1] - join () {1,3} + {4,5} = 15 15
用图来表示
8.2 J.U.C并发编程 8.2.1 *AQS 阻塞式锁原理 概述
全称是 AbstractQueuedSynchronizer,是阻塞式锁和相关的同步器工具的框架
特点:
用 state 属性来表示资源的状态(分独占模式和共享模式),子类需要定义如何维护这个状态,控制如何获取 锁和释放锁getState - 获取 state 状态 setState - 设置 state 状态 compareAndSetState - cas 机制设置 state 状态 独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源 提供了基于 FIFO 的等待队列,类似于 Monitor 的 EntryList 条件变量来实现等待、唤醒机制,支持多个条件变量,类似于 Monitor 的 WaitSet 子类主要实现这样一些方法(默认抛出 UnsupportedOperationException)
tryAcquire tryRelease tryAcquireShared tryReleaseShared isHeldExclusively 获取锁的姿势
1 2 3 4 if (!tryAcquire(arg)) { }
释放锁的姿势
1 2 3 4 if (tryRelease(arg)) { }
1)实现不可重入锁 自定义同步器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 final class MySync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int acquires) { if (acquires == 1 ){ if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } } return false ; } @Override protected boolean tryRelease (int acquires) { if (acquires == 1 ) { if (getState() == 0 ) { throw new IllegalMonitorStateException (); } setExclusiveOwnerThread(null ); setState(0 ); return true ; } return false ; } protected Condition newCondition () { return new ConditionObject (); } @Override protected boolean isHeldExclusively () { return getState() == 1 ; } }
自定义锁
有了自定义同步器,很容易复用 AQS ,实现一个功能完备的自定义锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 class MyLock implements Lock { static MySync sync = new MySync (); @Override public void lock () { sync.acquire(1 ); } @Override public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } @Override public boolean tryLock () { return sync.tryAcquire(1 ); } @Override public boolean tryLock (long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1 , unit.toNanos(time)); } @Override public void unlock () { sync.release(1 ); } @Override public Condition newCondition () { return sync.newCondition(); } }
测试一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 MyLock lock = new MyLock ();new Thread (() -> { lock.lock(); try { log.debug("locking..." ); sleep(1 ); } finally { log.debug("unlocking..." ); lock.unlock(); } },"t1" ).start();new Thread (() -> { lock.lock(); try { log.debug("locking..." ); } finally { log.debug("unlocking..." ); lock.unlock(); } },"t2" ).start();
输出
1 2 3 4 22:29:28.727 c.TestAqs [t1] - locking... 22:29:29.732 c.TestAqs [t1] - unlocking... 22:29:29.732 c.TestAqs [t2] - locking... 22:29:29.732 c.TestAqs [t2] - unlocking...
不可重入测试
如果改为下面代码,会发现自己也会被挡住(只会打印一次 locking)
1 2 3 4 lock.lock(); log.debug("locking..." ); lock.lock(); log.debug("locking..." );
2)心得 起源
早期程序员会自己通过一种同步器去实现另一种相近的同步器,例如用可重入锁去实现信号量,或反之。这显然不 够优雅,于是在 JSR166(java 规范提案)中创建了 AQS,提供了这种通用的同步器机制。
目标
AQS 要实现的功能目标
阻塞版本获取锁 acquire 和非阻塞的版本尝试获取锁 tryAcquire 获取锁超时机制 通过打断取消机制 独占机制及共享机制 条件不满足时的等待机制 要实现的性能目标
Instead, the primary performance goal here is scalability: to predictably maintain efficiency even, or especially, when synchronizers are contended.
设计
AQS 的基本思想其实很简单
获取锁的逻辑
1 2 3 4 5 6 while (state 状态不允许获取) { if (队列中还没有此线程) { 入队并阻塞 } } 当前线程出队
释放锁的逻辑
1 2 3 if (state 状态允许了) { 恢复阻塞的线程(s) }
要点
state 设计state 使用 volatile 配合 cas 保证其修改时的原子性 state 使用了 32bit int 来维护同步状态,因为当时使用 long 在很多平台下测试的结果并不理想 阻塞恢复设计早期的控制线程暂停和恢复的 api 有 suspend 和 resume,但它们是不可用的,因为如果先调用的 resume 那么 suspend 将感知不到 解决方法是使用 park & unpark 来实现线程的暂停和恢复,具体原理在之前讲过了,先 unpark 再 park 也没 问题 park & unpark 是针对线程的,而不是针对同步器的,因此控制粒度更为精细 park 线程还可以通过 interrupt 打断 队列设计使用了 FIFO 先入先出队列,并不支持优先级队列 设计时借鉴了 CLH 队列,它是一种单向无锁队列
队列中有 head 和 tail 两个指针节点,都用 volatile 修饰配合 cas 使用,每个节点有 state 维护节点状态 入队伪代码,只需要考虑 tail 赋值的原子性
1 2 3 4 5 do { Node prev = tail; } while (tail.compareAndSet(prev, node))
出队伪代码
1 2 3 4 5 while ((Node prev=node.prev).state != 唤醒状态) { } head = node;
CLH 好处:
AQS 在一些方面改进了 CLH
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
主要用到 AQS 的并发工具类
8.2.2 ReentrantLock 原理
1)非公平锁实现原理 加锁解锁流程
先从构造器开始看,默认为非公平锁实现
1 2 3 public ReentrantLock () { sync = new NonfairSync (); }
NonfairSync 继承自 AQS 没有竞争时
第一个竞争出现时
Thread-1 执行了
CAS 尝试将 state 由 0 改为 1,结果失败 进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败 接下来进入 addWaiter 逻辑,构造 Node 队列图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态 Node 的创建是懒惰的 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
当前线程进入 acquireQueued 逻辑
acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true 进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)
再次有多个线程经历上述过程竞争失败,变成这个样子
Thread-0 释放锁,进入 tryRelease 流程,如果成功
设置 exclusiveOwnerThread 为 null state = 0
当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程
找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1
回到 Thread-1 的 acquireQueued 流程
如果加锁成功(没有竞争),会设置
exclusiveOwnerThread 为 Thread-1,state = 1 head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread 原本的 head 因为从链表断开,而可被垃圾回收 如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了
如果不巧又被 Thread-4 占了先
Thread-4 被设置为 exclusiveOwnerThread,state = 1 Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞 加锁与解锁源码 加锁源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L ; final void lock () { if (compareAndSetState(0 , 1 )) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1 ); } public final void acquire (int arg) { if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { selfInterrupt(); } } protected final boolean tryAcquire (int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } private Node addWaiter (Node mode) { Node node = new Node (Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) { tail = head; } } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if ( shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() ) { interrupted = true ; } } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) { return true ; } if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; } private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); } }
注意
是否需要 unpark 是由当前节点的前驱节点的 waitStatus == Node.SIGNAL 来决定,而不是本节点的 waitStatus 决定
总结:
调用lock
,尝试将state从0修改为1成功:将owner设为当前线程 失败:调用acquire
->tryAcquire
->nonfairTryAcquire
,判断state=0则获得锁,或者state不为0但当前线程持有锁则重入锁,以上两种情况tryAcquire
返回true,剩余情况返回false。true:获得锁 false:调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
,其中addwiter
将关联线程的节点插入AQS队列尾部,进入acquireQueued
中的for循环:如果当前节点是头节点,并尝试获得锁成功,将当前节点设为头节点,清除此节点信息,返回打断标记。 调用shoudParkAfterFailure
,第一次调用返回false,并将前驱节点改为-1,第二次循环如果再进入此方法,会进入阻塞并检查打断的方法。 解锁源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 static final class NonfairSync extends Sync { public void unlock () { sync.release(1 ); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if ( h != null && h.waitStatus != 0 ) { unparkSuccessor(h); } return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) { compareAndSetWaitStatus(node, ws, 0 ); } Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); } }
总结:
unlock
->syn.release
(1)->tryRelease
(1),如果当前线程并不持有锁,抛异常。state减去1,如果之后state为0,解锁成功,返回true;如果仍大于0,表示解锁不完全,当前线程依旧持有锁,返回false。返回true:检查AQS队列第一个节点状态图是否为SIGNAL
(意味着有责任唤醒其后记节点),如果有,调用unparkSuccessor
。再unparkSuccessor
中,不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点,如果有,将其唤醒。 返回false: 2)可重入锁原理 当持有锁的线程再次尝试获取锁时,会将state的值加1,state表示锁的重入量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 static final class NonfairSync extends Sync { final boolean nonfairTryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; } }
3)可打断锁原理 不可打断模式 在此模式下,即使它被打断,仍会驻留在 AQS 队列中,并将打断信号存储在一个interrupt变量中。一直要等到获得锁后方能得知自己被打断了,并且调用selfInterrupt
方法打断自己。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 static final class NonfairSync extends Sync { private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); } final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if ( shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() ) { interrupted = true ; } } } finally { if (failed) cancelAcquire(node); } } public final void acquire (int arg) { if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { selfInterrupt(); } } static void selfInterrupt () { Thread.currentThread().interrupt(); } }
可打断模式 此模式下即使线程在等待队列中等待,一旦被打断,就会立刻抛出打断异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 static final class NonfairSync extends Sync { public final void acquireInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); } private void doAcquireInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return ; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) { throw new InterruptedException (); } } } finally { if (failed) cancelAcquire(node); } } }
4)公平锁实现原理 简而言之,公平与非公平的区别在于,公平锁中的tryAcquire方法被重写了,新来的线程即便得知了锁的state为0,也要先判断等待队列中是否还有线程等待,只有当队列没有线程等待式,才获得锁 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L ; final void lock () { acquire(1 ); } public final void acquire (int arg) { if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { selfInterrupt(); } } protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } public final boolean hasQueuedPredecessors () { Node t = tail; Node h = head; Node s; return h != t && ( (s = h.next) == null || s.thread != Thread.currentThread() ); } }
5)条件变量实现原理 每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject
await 流程 开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程
创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部
接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁
unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功
park 阻塞 Thread-0
总结:
创建一个节点,关联当前线程,并插入到当前Condition队列的尾部 调用fullRelease
,完全释放同步器中的锁,并记录当前线程的锁重入数 唤醒(park)AQS队列中的第一个线程 调用park方法,阻塞当前线程。 signal 流程 假设 Thread-1 要来唤醒 Thread-0
进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node
执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的 waitStatus 改为 -1
Thread-1 释放锁,进入 unlock 流程,略
总结:
当前持有锁的线程唤醒等待队列中的线程,调用doSignal或doSignalAll方法,将等待队列中的第一个(或全部)节点插入到AQS队列中的尾部。 将插入的节点的状态从Condition设置为0,将插入节点的前一个节点的状态设置为-1(意味着要承担唤醒后一个节点的责任) 当前线程释放锁,parkAQS队列中的第一个节点线程。 源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 public class ConditionObject implements Condition , java.io.Serializable { private static final long serialVersionUID = 1173984872572414699L ; private transient Node firstWaiter; private transient Node lastWaiter; public ConditionObject () { } private Node addConditionWaiter () { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node (Thread.currentThread(), Node.CONDITION); if (t == null ) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } private void doSignal (Node first) { do { if ( (firstWaiter = first.nextWaiter) == null ) { lastWaiter = null ; } first.nextWaiter = null ; } while ( !transferForSignal(first) && (first = firstWaiter) != null ); } final boolean transferForSignal (Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0 )) return false ; Node p = enq(node); int ws = p.waitStatus; if ( ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) ) { LockSupport.unpark(node.thread); } return true ; } private void doSignalAll (Node first) { lastWaiter = firstWaiter = null ; do { Node next = first.nextWaiter; first.nextWaiter = null ; transferForSignal(first); first = next; } while (first != null ); } private void unlinkCancelledWaiters () { } public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignal(first); } public final void signalAll () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignalAll(first); } public final void awaitUninterruptibly () { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if (Thread.interrupted()) interrupted = true ; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } private void doSignalAll (Node first) { lastWaiter = firstWaiter = null ; do { Node next = first.nextWaiter; first.nextWaiter = null ; transferForSignal(first); first = next; } while (first != null ); } private void unlinkCancelledWaiters () { } public final void signal () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignal(first); } public final void signalAll () { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); Node first = firstWaiter; if (first != null ) doSignalAll(first); } public final void awaitUninterruptibly () { Node node = addConditionWaiter(); int savedState = fullyRelease(node); boolean interrupted = false ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if (Thread.interrupted()) interrupted = true ; } if (acquireQueued(node, savedState) || interrupted) selfInterrupt(); } final int fullyRelease (Node node) { boolean failed = true ; try { int savedState = getState(); if (release(savedState)) { failed = false ; return savedState; } else { throw new IllegalMonitorStateException (); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } private static final int REINTERRUPT = 1 ; private static final int THROW_IE = -1 ; private int checkInterruptWhileWaiting (Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0 ; } private void reportInterruptAfterWait (int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException (); else if (interruptMode == REINTERRUPT) selfInterrupt(); } public final void await () throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException (); } Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0 ; while (!isOnSyncQueue(node)) { LockSupport.park(this ); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); } private Node addConditionWaiter () { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node (Thread.currentThread(), Node.CONDITION); if (t == null ) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } final boolean isOnSyncQueue (Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null ) return false ; if (node.next != null ) return true ; return findNodeFromTail(node); } public final long awaitNanos (long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException (); } Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0 ; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L ) { transferAfterCancelledWait(node); break ; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this , nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 ) break ; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null ) unlinkCancelledWaiters(); if (interruptMode != 0 ) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime(); } public final boolean awaitUntil (Date deadline) throws InterruptedException { } public final boolean await (long time, TimeUnit unit) throws InterruptedException { } }
8.3 读写锁 8.3.1 ReentrantReadWriteLock 当读操作远远高于写操作时,这时候使用读写锁
让读-读
可以并发,提高性能。 类似于数据库中的select ... from ... lock in share mode
提供一个数据容器类
内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法
测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class DataContainer { private Object data; private ReentrantReadWriteLock rw = new ReentrantReadWriteLock (); private ReentrantReadWriteLock.ReadLock r = rw.readLock(); private ReentrantReadWriteLock.WriteLock w = rw.writeLock(); public Object read () { log.debug("获取读锁..." ); r.lock(); try { log.debug("读取" ); sleep(1 ); return data; } finally { log.debug("释放读锁..." ); r.unlock(); } } public void write () { log.debug("获取写锁..." ); w.lock(); try { log.debug("写入" ); sleep(1 ); } finally { log.debug("释放写锁..." ); w.unlock(); } } }
测试读锁-读锁
可以并发
1 2 3 4 5 6 7 DataContainer dataContainer = new DataContainer ();new Thread (() -> { dataContainer.read(); }, "t1" ).start();new Thread (() -> { dataContainer.read(); }, "t2" ).start();
输出结果,从这里可以看到 Thread-0 锁定期间,Thread-1 的读操作不受影响
1 2 3 4 5 6 14:05:14.341 c.DataContainer [t2] - 获取读锁... 14:05:14.341 c.DataContainer [t1] - 获取读锁... 14:05:14.345 c.DataContainer [t1] - 读取 14:05:14.345 c.DataContainer [t2] - 读取 14:05:15.365 c.DataContainer [t2] - 释放读锁... 14:05:15.386 c.DataContainer [t1] - 释放读锁...
测试读锁-写锁
相互阻塞
1 2 3 4 5 6 7 8 DataContainer dataContainer = new DataContainer ();new Thread (() -> { dataContainer.read(); }, "t1" ).start(); Thread.sleep(100 );new Thread (() -> { dataContainer.write(); }, "t2" ).start();
输出结果
1 2 3 4 5 6 14 :04 :21.838 c.DataContainer [t1] - 获取读锁... 14 :04 :21.838 c.DataContainer [t2] - 获取写锁... 14 :04 :21.841 c.DataContainer [t2] - 写入14 :04 :22.843 c.DataContainer [t2] - 释放写锁... 14 :04 :22.843 c.DataContainer [t1] - 读取14 :04 :23.843 c.DataContainer [t1] - 释放读锁...
写锁-写锁
也是相互阻塞的,这里就不测试了
注意事项
读锁不支持条件变量 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待 1 2 3 4 5 6 7 8 9 10 11 12 r.lock();try { w.lock(); try { } finally { w.unlock(); } } finally { r.unlock(); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock (); void processCachedData () { rwl.readLock().lock(); if (!cacheValid) { rwl.readLock().unlock(); rwl.writeLock().lock(); try { if (!cacheValid) { data = ... cacheValid = true ; } rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); } } try { use(data); } finally { rwl.readLock().unlock(); } } }
8.3.2 * 应用之缓存 缓存更新策略 更新时,是先清缓存还是先更新数据库 【应该先更新库再清缓存 】
先清缓存
先更新数据库
补充一种情况,假设查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询
这种情况的出现几率非常小,见 facebook 论文
读写锁实现一致性缓存 使用读写锁实现一个简单的按需加载缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 class GenericCachedDao <T> { HashMap<SqlPair, T> map = new HashMap <>(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock (); GenericDao genericDao = new GenericDao (); public int update (String sql, Object... params) { SqlPair key = new SqlPair (sql, params); lock.writeLock().lock(); try { int rows = genericDao.update(sql, params); map.clear(); return rows; } finally { lock.writeLock().unlock(); } } public T queryOne (Class<T> beanClass, String sql, Object... params) { SqlPair key = new SqlPair (sql, params); lock.readLock().lock(); try { T value = map.get(key); if (value != null ) { return value; } } finally { lock.readLock().unlock(); } lock.writeLock().lock(); try { T value = map.get(key); if (value == null ) { value = genericDao.queryOne(beanClass, sql, params); map.put(key, value); } return value; } finally { lock.writeLock().unlock(); } } class SqlPair { private String sql; private Object[] params; public SqlPair (String sql, Object[] params) { this .sql = sql; this .params = params; } @Override public boolean equals (Object o) { if (this == o) { return true ; } if (o == null || getClass() != o.getClass()) { return false ; } SqlPair sqlPair = (SqlPair) o; return sql.equals(sqlPair.sql) && Arrays.equals(params, sqlPair.params); } @Override public int hashCode () { int result = Objects.hash(sql); result = 31 * result + Arrays.hashCode(params); return result; } } }
注意
8.3.3 * 读写锁原理 图解流程 读写锁用的是同一个 Sycn 同步器,因此等待队列、state 等也是同一个
t1 w.lock,t2 r.lock
1) t1 成功上锁,流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁 使用的是 state 的高 16 位
2)t2 执行 r.lock,这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。如果有写 锁占据,那么 tryAcquireShared 返回 -1 表示失败
tryAcquireShared 返回值表示
-1 表示失败 0 表示成功,但后继节点不会继续唤醒 正数表示成功,而且数值是还有几个后继节点需要唤醒,读写锁返回 1
3)这时会进入 sync.doAcquireShared(1) 流程,首先也是调用 addWaiter 添加节点,不同之处在于节点被设置为 Node.SHARED 模式而非 Node.EXCLUSIVE 模式,注意此时 t2 仍处于活跃状态
4)t2 会看看自己的节点是不是老二,如果是,还会再次调用 tryAcquireShared(1) 来尝试获取锁
5)如果没有成功,在 doAcquireShared 内 for (;;) 循环一次,把前驱节点的 waitStatus 改为 -1,再 for (;;) 循环一 次尝试 tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() 处 park
t3 r.lock,t4 w.lock
这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子
image-20220314191927467 t1 w.unlock
这时会走到写锁的 sync.release(1) 流程,调用 sync.tryRelease(1) 成功,变成下面的样子
接下来执行唤醒流程 sync.unparkSuccessor,即让老二恢复运行,这时 t2 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行
这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一
这时 t2 已经恢复运行,接下来 t2 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点
image-20220314192048242 事情还没完,在 setHeadAndPropagate 方法内还会检查下一个节点是否是 shared,如果是则调用 doReleaseShared() 将 head 的状态从 -1 改为 0 并唤醒老二,这时 t3 在 doAcquireShared 内 parkAndCheckInterrupt() 处恢复运行
这回再来一次 for (;;) 执行 tryAcquireShared 成功则让读锁计数加一
image-20220314192123102 这时 t3 已经恢复运行,接下来 t3 调用 setHeadAndPropagate(node, 1),它原本所在节点被置为头节点
下一个节点不是 shared 了,因此不会继续唤醒 t4 所在节点
t2 r.unlock,t3 r.unlock
t2 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,但由于计数还不为零
t3 进入 sync.releaseShared(1) 中,调用 tryReleaseShared(1) 让计数减一,这回计数为零了,进入 doReleaseShared() 将头节点从 -1 改为 0 并唤醒老二,即
之后 t4 在 acquireQueued 中 parkAndCheckInterrupt 处恢复运行,再次 for (;;) 这次自己是老二,并且没有其他 竞争,tryAcquire(1) 成功,修改头结点,流程结束
源码分析 写锁上锁流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 static final class NonfairSync extends Sync { public void lock () { sync.acquire(1 ); } public final void acquire (int arg) { if ( !tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg) ) { selfInterrupt(); } } protected final boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0 ) { if ( w == 0 || current != getExclusiveOwnerThread() ) { return false ; } if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); setState(c + acquires); return true ; } if ( writerShouldBlock() || !compareAndSetState(c, c + acquires) ) { return false ; } setExclusiveOwnerThread(current); return true ; } final boolean writerShouldBlock () { return false ; } }
总结:
lock
-> syn.acquire
->tryAquire
如果有锁:如果是写锁或者锁持有者不为自己,返回false 如果时写锁且为自己持有,则重入 如果无锁:判断无序阻塞并设置state成功后,将owner设为自己,返回true 成功,则获得了锁 失败:调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
进入阻塞队列,将节点状态设置为EXCLUSIVE,之后的逻辑与之前的aquireQueued类似。 写锁释放流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 static final class NonfairSync extends Sync { public void unlock () { sync.release(1 ); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } protected final boolean tryRelease (int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0 ; if (free) { setExclusiveOwnerThread(null ); } setState(nextc); return free; } }
总结:
读锁上锁流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 static final class NonfairSync extends Sync { public void lock () { sync.acquireShared(1 ); } public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) { doAcquireShared(arg); } } protected final int tryAcquireShared (int unused) { Thread current = Thread.currentThread(); int c = getState(); if ( exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current ) { return -1 ; } int r = sharedCount(c); if ( !readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT) ) { return 1 ; } return fullTryAcquireShared(current); } final boolean readerShouldBlock () { return apparentlyFirstQueuedIsExclusive(); } final int fullTryAcquireShared (Thread current) { HoldCounter rh = null ; for (;;) { int c = getState(); if (exclusiveCount(c) != 0 ) { if (getExclusiveOwnerThread() != current) return -1 ; } else if (readerShouldBlock()) { } if (sharedCount(c) == MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); if (compareAndSetState(c, c + SHARED_UNIT)) { return 1 ; } } } private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; if (interrupted) selfInterrupt(); failed = false ; return ; } } if ( shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt() ) { interrupted = true ; } } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) { doReleaseShared(); } } } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } } }
总结:
lock
->syn.acquireShare
->tryAcquireShare
如果其他线程持有写锁:则失败,返回-1 否则:判断无需等待后,将state加上一个写锁的单位,返回1 返回值大于等于0:成功 返回值小于0:调用doAcquireShare,类似之前的aquireQueued,将当前线程关联节点,状态设置为SHARE,插入AQS队列尾部。在for循环中判断当前节点的前驱节点是否为头节点是:调用tryAcquireShare
如果返回值大于等于0,则获取锁成功,并调用setHeadAndPropagate
,出队,并不断唤醒AQS队列中的状态为SHARE的节点,直到下一个节点为EXCLUSIVE。记录打断标记,之后退出方法(不返回打断标记) 判断是否在失败后阻塞是:阻塞住,并监测打断信号。 否则:将前驱节点状态设为-1。(下一次循环就又要阻塞了) 读锁释放流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 static final class NonfairSync extends Sync { public void unlock () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected final boolean tryReleaseShared (int unused) { for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) { return nextc == 0 ; } } } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } } }
总结:
unlock
->releaseShared
->tryReleaseShared
,将state减去一个share单元,最后state为0则返回true,不然返回false。返回tue:调用doReleaseShare
,唤醒队列中的节点。 返回false:解锁不完全。 8.3.4 StampedLock 带戳读写锁 该类自 JDK 8 加入,是为了进一步优化读性能,它的特点是在使用读锁、写锁时都必须配合【戳】使用 加解读锁
1 2 long stamp = lock.readLock(); lock.unlockRead(stamp);
加解写锁
1 2 long stamp = lock.writeLock(); lock.unlockWrite(stamp);
乐观读,StampedLock 支持 tryOptimisticRead() 方法(乐观读),读取完毕后需要做一次 戳校验 如果校验通 过,表示这期间确实没有写操作,数据可以安全使用,如果校验没通过,需要重新获取读锁,保证数据安全。
1 2 3 4 5 long stamp = lock.tryOptimisticRead();if (!lock.validate(stamp)){ }
提供一个数据容器类
内部分别使用读锁保护数据的read()
方法,写锁保护数据的write()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 class DataContainerStamped { private int data; private final StampedLock lock = new StampedLock (); public DataContainerStamped (int data) { this .data = data; } public int read (int readTime) { long stamp = lock.tryOptimisticRead(); log.debug("optimistic read locking...{}" , stamp); sleep(readTime); if (lock.validate(stamp)) { log.debug("read finish...{}, data:{}" , stamp, data); return data; } log.debug("updating to read lock... {}" , stamp); try { stamp = lock.readLock(); log.debug("read lock {}" , stamp); sleep(readTime); log.debug("read finish...{}, data:{}" , stamp, data); return data; } finally { log.debug("read unlock {}" , stamp); lock.unlockRead(stamp); } } public void write (int newData) { long stamp = lock.writeLock(); log.debug("write lock {}" , stamp); try { sleep(2 ); this .data = newData; } finally { log.debug("write unlock {}" , stamp); lock.unlockWrite(stamp); } } }
测试读-读
可以优化
1 2 3 4 5 6 7 8 9 10 public static void main (String[] args) { DataContainerStamped dataContainer = new DataContainerStamped (1 ); new Thread (() -> { dataContainer.read(1 ); }, "t1" ).start(); sleep(0.5 ); new Thread (() -> { dataContainer.read(0 ); }, "t2" ).start(); }
输出结果,可以看到实际没有加读锁
1 2 3 4 15:58:50.217 c.DataContainerStamped [t1] - optimistic read locking...256 15:58:50.717 c.DataContainerStamped [t2] - optimistic read locking...256 15:58:50.717 c.DataContainerStamped [t2] - read finish...256, data:1 15:58:51.220 c.DataContainerStamped [t1] - read finish...256, data:1
测试读-写
时优化读补加读锁
1 2 3 4 5 6 7 8 9 10 public static void main (String[] args) { DataContainerStamped dataContainer = new DataContainerStamped (1 ); new Thread (() -> { dataContainer.read(1 ); }, "t1" ).start(); sleep(0.5 ); new Thread (() -> { dataContainer.write(100 ); }, "t2" ).start(); }
输出结果
1 2 3 4 5 6 7 15:57:00.219 c.DataContainerStamped [t1] - optimistic read locking...256 15:57:00.717 c.DataContainerStamped [t2] - write lock 384 15:57:01.225 c.DataContainerStamped [t1] - updating to read lock... 256 15:57:02.719 c.DataContainerStamped [t2] - write unlock 384 15:57:02.719 c.DataContainerStamped [t1] - read lock 513 15:57:03.719 c.DataContainerStamped [t1] - read finish...513, data:1000 15:57:03.719 c.DataContainerStamped [t1] - read unlock 513
注意
StampedLock 不支持条件变量 StampedLock 不支持可重入 8.4 Semaphore信号量 基本使用
[ˈsɛməˌfɔr] 信号量,用来限制能同时访问共享资源的线程上限 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public static void main (String[] args) { Semaphore semaphore = new Semaphore (3 ); for (int i = 0 ; i < 10 ; i++) { new Thread (() -> { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } try { log.debug("running..." ); sleep(1 ); log.debug("end..." ); } finally { semaphore.release(); } }).start(); } }
输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 07:35:15.485 c.TestSemaphore [Thread-2] - running... 07:35:15.485 c.TestSemaphore [Thread-1] - running... 07:35:15.485 c.TestSemaphore [Thread-0] - running... 07:35:16.490 c.TestSemaphore [Thread-2] - end... 07:35:16.490 c.TestSemaphore [Thread-0] - end... 07:35:16.490 c.TestSemaphore [Thread-1] - end... 07:35:16.490 c.TestSemaphore [Thread-3] - running... 07:35:16.490 c.TestSemaphore [Thread-5] - running... 07:35:16.490 c.TestSemaphore [Thread-4] - running... 07:35:17.490 c.TestSemaphore [Thread-5] - end... 07:35:17.490 c.TestSemaphore [Thread-4] - end... 07:35:17.490 c.TestSemaphore [Thread-3] - end... 07:35:17.490 c.TestSemaphore [Thread-6] - running... 07:35:17.490 c.TestSemaphore [Thread-7] - running... 07:35:17.490 c.TestSemaphore [Thread-9] - running... 07:35:18.491 c.TestSemaphore [Thread-6] - end... 07:35:18.491 c.TestSemaphore [Thread-7] - end... 07:35:18.491 c.TestSemaphore [Thread-9] - end... 07:35:18.491 c.TestSemaphore [Thread-8] - running... 07:35:19.492 c.TestSemaphore [Thread-8] - end...
说明:
Semaphore有两个构造器:Semaphore(int permits)
和Semaphore(int permits,boolean fair)
permits表示允许同时访问共享资源的线程数。 fair表示公平与否,与之前的ReentrantLock一样。 8.4.1 * Semaphore 应用 semaphore 限制对共享资源的使用
使用 Semaphore 限流,在访问高峰期时,让请求线程阻塞,高峰期过去再释放许可,当然它只适合限制单机 线程数量,并且仅是限制线程数,而不是限制资源数 (例如连接数,请对比 Tomcat LimitLatch 的实现) 用 Semaphore 实现简单连接池,对比『享元模式』下的实现(用wait notify),性能和可读性显然更好, 注意下面的实现中线程数和数据库连接数是相等的 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Slf4j(topic = "c.Pool") class Pool { private final int poolSize; private Connection[] connections; private AtomicIntegerArray states; private Semaphore semaphore; public Pool (int poolSize) { this .poolSize = poolSize; this .semaphore = new Semaphore (poolSize); this .connections = new Connection [poolSize]; this .states = new AtomicIntegerArray (new int [poolSize]); for (int i = 0 ; i < poolSize; i++) { connections[i] = new MockConnection ("连接" + (i+1 )); } } public Connection borrow () { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0 ; i < poolSize; i++) { if (states.get(i) == 0 ) { if (states.compareAndSet(i, 0 , 1 )) { log.debug("borrow {}" , connections[i]); return connections[i]; } } } return null ; } public void free (Connection conn) { for (int i = 0 ; i < poolSize; i++) { if (connections[i] == conn) { states.set(i, 0 ); log.debug("free {}" , conn); semaphore.release(); break ; } } } }
8.4.2 * Semaphore 原理 加锁解锁流程 Semaphore有点像一个停车场,permits就好像停车位数量,当线程获得了permits就像是获得了停车位,然后停车场显示空余车位减一。
刚开始,permits(state)为 3,这时 5 个线程来获取资源
假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列 park 阻塞
这时 Thread-4 释放了 permits,状态如下
接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接 下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
源码分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L ; NonfairSync(int permits) { super (permits); } public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared (int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if ( remaining < 0 || compareAndSetState(available, remaining) ) { return remaining; } } } private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } } public void release () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } protected final boolean tryReleaseShared (int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error ("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } } }private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) { doReleaseShared(); } } }private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
加锁流程总结 acquire
->acquireSharedInterruptibly(1)
->tryAcquireShared(1)
->nonfairTryAcquireShared(1)
,如果资源用完了,返回负数,tryAcquireShared
返回负数,表示失败。否则返回正数,tryAcquireShared
返回正数,表示成功。如果成功,获取信号量成功。 如果失败,调用doAcquireSharedInterruptibly
,进入for循环:如果当前驱节点为头节点,调用tryAcquireShared
尝试获取锁如果结果大于等于0,表明获取锁成功,调用setHeadAndPropagate
,将当前节点设为头节点,之后又调用doReleaseShared
,唤醒后继节点。 调用shoudParkAfterFailure
,第一次调用返回false,并将前驱节点改为-1,第二次循环如果再进入此方法,会进入阻塞并检查打断的方法。 解锁流程总结 release
->sync.releaseShared(1)
->tryReleaseShared(1)
,只要不发生整数溢出,就返回true如果返回true,调用doReleaseShared
,唤醒后继节点。 如果返回false,解锁失败。 8.5 CountdownLatch倒计时锁 用来进行线程同步协作,等待所有线程完成倒计时。
其中构造参数用来初始化等待计数值,await() 用来等待计数归零,countDown() 用来让计数减一
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public static void main (String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch (3 ); new Thread (() -> { log.debug("begin..." ); sleep(1 ); latch.countDown(); log.debug("end...{}" , latch.getCount()); }).start(); new Thread (() -> { log.debug("begin..." ); sleep(2 ); latch.countDown(); log.debug("end...{}" , latch.getCount()); }).start(); new Thread (() -> { log.debug("begin..." ); sleep(1.5 ); latch.countDown(); log.debug("end...{}" , latch.getCount()); }).start(); log.debug("waiting..." ); latch.await(); log.debug("wait end..." ); }
输出
1 2 3 4 5 6 7 8 18:44:00.778 c.TestCountDownLatch [main] - waiting... 18:44:00.778 c.TestCountDownLatch [Thread-2] - begin... 18:44:00.778 c.TestCountDownLatch [Thread-0] - begin... 18:44:00.778 c.TestCountDownLatch [Thread-1] - begin... 18:44:01.782 c.TestCountDownLatch [Thread-0] - end...2 18:44:02.283 c.TestCountDownLatch [Thread-2] - end...1 18:44:02.782 c.TestCountDownLatch [Thread-1] - end...0 18:44:02.782 c.TestCountDownLatch [main] - wait end...
相比于join,CountDownLatch能配合线程池使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public static void main (String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch (3 ); ExecutorService service = Executors.newFixedThreadPool(4 ); service.submit(() -> { log.debug("begin..." ); sleep(1 ); latch.countDown(); log.debug("end...{}" , latch.getCount()); }); service.submit(() -> { log.debug("begin..." ); sleep(1.5 ); latch.countDown(); log.debug("end...{}" , latch.getCount()); }); service.submit(() -> { log.debug("begin..." ); sleep(2 ); latch.countDown(); log.debug("end...{}" , latch.getCount()); }); service.submit(()->{ try { log.debug("waiting..." ); latch.await(); log.debug("wait end..." ); } catch (InterruptedException e) { e.printStackTrace(); } }); }
* 应用之同步等待多线程准备完毕 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 AtomicInteger num = new AtomicInteger (0 );ExecutorService service = Executors.newFixedThreadPool(10 , (r) -> { return new Thread (r, "t" + num.getAndIncrement()); });CountDownLatch latch = new CountDownLatch (10 ); String[] all = new String [10 ];Random r = new Random ();for (int j = 0 ; j < 10 ; j++) { int x = j; service.submit(() -> { for (int i = 0 ; i <= 100 ; i++) { try { Thread.sleep(r.nextInt(100 )); } catch (InterruptedException e) { } all[x] = Thread.currentThread().getName() + "(" + (i + "%" ) + ")" ; System.out.print("\r" + Arrays.toString(all)); } latch.countDown(); }); } latch.await(); System.out.println("\n游戏开始..." ); service.shutdown();
中间输出
1 [t0(52%), t1(47%), t2(51%), t3(40%), t4(49%), t5(44%), t6(49%), t7(52%), t8(46%), t9(46%)]
最后输出
1 2 3 [t0(100%), t1(100%), t2(100%), t3(100%), t4(100%), t5(100%), t6(100%), t7(100%), t8(100%), t9(100%)] 游戏开始...
* 应用之同步等待多个远程调用结束 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 @RestController public class TestCountDownlatchController { @GetMapping("/order/{id}") public Map<String, Object> order (@PathVariable int id) { HashMap<String, Object> map = new HashMap <>(); map.put("id" , id); map.put("total" , "2300.00" ); sleep(2000 ); return map; } @GetMapping("/product/{id}") public Map<String, Object> product (@PathVariable int id) { HashMap<String, Object> map = new HashMap <>(); if (id == 1 ) { map.put("name" , "小爱音箱" ); map.put("price" , 300 ); } else if (id == 2 ) { map.put("name" , "小米手机" ); map.put("price" , 2000 ); } map.put("id" , id); sleep(1000 ); return map; } @GetMapping("/logistics/{id}") public Map<String, Object> logistics (@PathVariable int id) { HashMap<String, Object> map = new HashMap <>(); map.put("id" , id); map.put("name" , "中通快递" ); sleep(2500 ); return map; } private void sleep (int millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }
rest远程调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 RestTemplate restTemplate = new RestTemplate (); log.debug("begin" );ExecutorService service = Executors.newCachedThreadPool();CountDownLatch latch = new CountDownLatch (4 ); Future<Map<String,Object>> f1 = service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/order/{1}" , Map.class, 1 ); return r; }); Future<Map<String, Object>> f2 = service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/product/{1}" , Map.class, 1 ); return r; }); Future<Map<String, Object>> f3 = service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/product/{1}" , Map.class, 2 ); return r; }); Future<Map<String, Object>> f4 = service.submit(() -> { Map<String, Object> r = restTemplate.getForObject("http://localhost:8080/logistics/{1}" , Map.class, 1 ); return r; }); System.out.println(f1.get()); System.out.println(f2.get()); System.out.println(f3.get()); System.out.println(f4.get()); log.debug("执行完毕" ); service.shutdown();
执行结果
1 2 3 4 5 6 19:51:39.711 c.TestCountDownLatch [main] - begin {total=2300.00, id =1} {price=300, name=小爱音箱, id =1} {price=2000, name=小米手机, id =2} {name=中通快递, id =1} 19:51:42.407 c.TestCountDownLatch [main] - 执行完毕
说明:
这种等待多个带有返回值的任务的场景,还是用future比较合适,CountdownLatch适合任务没有返回值的场景。 8.6 CyclicBarrier循环栅栏 CountdownLatch的缺点在于不能重用,见下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private static void test1 () { ExecutorService service = Executors.newFixedThreadPool(5 ); for (int i = 0 ; i < 3 ; i++) { CountDownLatch latch = new CountDownLatch (2 ); service.submit(() -> { log.debug("task1 start..." ); sleep(1 ); latch.countDown(); }); service.submit(() -> { log.debug("task2 start..." ); sleep(2 ); latch.countDown(); }); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("task1 task2 finish..." ); } service.shutdown(); }
想要重复使用CountdownLatch进行同步,必须创建多个CountDownLatch对象。
我 CyclicBarrier
[ˈsaɪklɪk ˈbæriɚ] 循环栅栏,用来进行线程协作,等待线程满足某个计数。 构造时设置『计数个数』,每个线程执 行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 CyclicBarrier cb = new CyclicBarrier (2 ); new Thread (()->{ System.out.println("线程1开始.." +new Date ()); try { cb.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程1继续向下运行..." +new Date ()); }).start();new Thread (()->{ System.out.println("线程2开始.." +new Date ()); try { Thread.sleep(2000 ); } catch (InterruptedException e) { } try { cb.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("线程2继续向下运行..." +new Date ()); }).start();
注意
CyclicBarrier 与 CountDownLatch 的主要区别在于 CyclicBarrier 是可以重用的 CyclicBarrier 可以被比 喻为『人满发车』 CountDownLatch的计数和阻塞方法是分开的两个方法,而CyclicBarrier是一个方法。 CyclicBarrier的构造器还有一个Runnable类型的参数,在计数为0时会执行其中的run方法。 8.7 线程安全集合类概述
线程安全集合类可以分为三大类:
遗留的线程安全集合如Hashtable
,Vector
使用Collections
装饰的线程安全集合,如:Collections.synchronizedCollection
Collections.synchronizedList
Collections.synchronizedMap
Collections.synchronizedSet
Collections.synchronizedNavigableMap
Collections.synchronizedNavigableSet
Collections.synchronizedSortedMap
Collections.synchronizedSortedSet
说明:以上集合均采用修饰模式设计,将非线程安全的集合包装后,在调用方法时包裹了一层synchronized代码块。其并发性并不比遗留的安全集合好。 java.util.concurrent.* 重点介绍java.util.concurrent.*
下的线程安全集合类,可以发现它们有规律,里面包含三类关键词: Blocking、CopyOnWrite、Concurrent
Blocking 大部分实现基于锁,并提供用来阻塞的方法 CopyOnWrite 之类容器修改开销相对较重 Concurrent 类型的容器内部很多操作使用 cas 优化,一般可以提供较高吞吐量 弱一致性遍历时弱一致性,例如,当利用迭代器遍历时,如果容器发生修改,迭代器仍然可以继续进行遍 历,这时内容是旧的 求大小弱一致性,size 操作未必是 100% 准确 读取弱一致性 遍历时如果发生了修改,对于非安全容器来讲,使用 fail-fast 机制也就是让遍历立刻失败,抛出 ConcurrentModificationException,不再继续遍历
8.7.1 ConcurrentHashMap 应用之单词计数 搭建练习环境:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 public class Test { public static void main (String[] args) { } public static <V> void calculate (Supplier<Map<String,V>> supplier, BiConsumer<Map<String,V>, List<String>> consumer) { Map<String, V> map = supplier.get(); CountDownLatch count = new CountDownLatch (26 ); for (int i = 1 ; i < 27 ; i++) { int k = i; new Thread (()->{ ArrayList<String> list = new ArrayList <>(); read(list,k); consumer.accept(map,list); count.countDown(); }).start(); } try { count.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(map.toString()); } public static void read (List<String> list,int i) { try { String element; BufferedReader reader = new BufferedReader (new FileReader (i + ".txt" )); while ((element = reader.readLine()) != null ){ list.add(element); } }catch (IOException e){ } } public void construct () { String str = "abcdefghijklmnopqrstuvwxyz" ; ArrayList<String> list = new ArrayList <>(); for (int i = 0 ; i < str.length(); i++) { for (int j = 0 ; j < 200 ; j++) { list.add(String.valueOf(str.charAt(i))); } } Collections.shuffle(list); for (int i = 0 ; i < 26 ; i++) { try (PrintWriter out = new PrintWriter (new FileWriter (i + 1 + ".txt" ))) { String collect = list.subList(i * 200 , (i + 1 ) * 200 ).stream().collect(Collectors.joining("\n" )); out.println(collect); } catch (IOException e) { e.printStackTrace(); } } } }
实现一:
1 2 3 4 5 6 7 8 9 10 11 12 13 demo( () -> new ConcurrentHashMap <String, Integer>(), (map, words) -> { for (String word : words) { Integer counter = map.get(word); int newValue = counter == null ? 1 : counter + 1 ; map.put(word, newValue); } } );
输出:
1 2 {a=186, b=192, c=187, d=184, e=185, f=185, g=176, h=185, i=193, j=189, k=187, l=157, m=189, n=181, o=180, p=178, q=185, r=188, s=181, t=183, u=177, v=186, w=188, x=178, y=189, z=186} 47
错误原因:
ConcurrentHashMap虽然每个方法都是线程安全的,但是多个方法的组合并不是线程安全的 。 正确答案一:
1 2 3 4 5 6 7 8 9 10 demo( () -> new ConcurrentHashMap <String, LongAdder>(), (map, words) -> { for (String word : words) { map.computeIfAbsent(word, (key) -> new LongAdder ()).increment(); } } );
说明:
computIfAbsent方法的作用是:当map中不存在以参数1为key对应的value时,会将参数2函数式接口的返回值作为value,put进map中,然后返回该value。如果存在key,则直接返回value 以上两部均是线程安全的。 正确答案二:
1 2 3 4 5 6 7 8 9 demo( () -> new ConcurrentHashMap <String, Integer>(), (map, words) -> { for (String word : words) { map.merge(word, 1 , Integer::sum); } } );
8.7.2 * ConcurrentHashMap 原理 1)JDK 7 HashMap 并发死链问题 测试代码
注意
要在 JDK 7 下运行,否则扩容机制和 hash 的计算方法都变了 以下测试代码是精心准备的,不要随便改动 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public static void main (String[] args) { System.out.println("长度为16时,桶下标为1的key" ); for (int i = 0 ; i < 64 ; i++) { if (hash(i) % 16 == 1 ) { System.out.println(i); } } System.out.println("长度为32时,桶下标为1的key" ); for (int i = 0 ; i < 64 ; i++) { if (hash(i) % 32 == 1 ) { System.out.println(i); } } final HashMap<Integer, Integer> map = new HashMap <Integer, Integer>(); map.put(2 , null ); map.put(3 , null ); map.put(4 , null ); map.put(5 , null ); map.put(6 , null ); map.put(7 , null ); map.put(8 , null ); map.put(9 , null ); map.put(10 , null ); map.put(16 , null ); map.put(35 , null ); map.put(1 , null ); System.out.println("扩容前大小[main]:" +map.size()); new Thread () { @Override public void run () { map.put(50 , null ); System.out.println("扩容后大小[Thread-0]:" +map.size()); } }.start(); new Thread () { @Override public void run () { map.put(50 , null ); System.out.println("扩容后大小[Thread-1]:" +map.size()); } }.start(); }final static int hash (Object k) { int h = 0 ; if (0 != h && k instanceof String) { return sun.misc.Hashing.stringHash32((String) k); } h ^= k.hashCode(); h ^= (h >>> 20 ) ^ (h >>> 12 ); return h ^ (h >>> 7 ) ^ (h >>> 4 ); }
死链复现 调试工具使用 idea
在 HashMap 源码 590 行加断点
1 int newCapacity = newTable.length;
断点的条件如下,目的是让 HashMap 在扩容为 32 时,并且线程为 Thread-0 或 Thread-1 时停下来
1 2 3 4 5 newTable.length==32 && ( Thread.currentThread().getName().equals("Thread-0" )|| Thread.currentThread().getName().equals("Thread-1" ) )
断点暂停方式选择 Thread,否则在调试 Thread-0 时,Thread-1 无法恢复运行
运行代码,程序在预料的断点位置停了下来,输出
1 2 3 4 5 6 7 8 9 长度为16时,桶下标为1的key 1 16 35 50 长度为32时,桶下标为1的key 1 35 扩容前大小[main]:12
接下来进入扩容流程调试
在 HashMap 源码 594 行加断点
1 2 3 Entry<K,V> next = e.next; if (rehash)
这是为了观察 e 节点和 next 节点的状态,Thread-0 单步执行到 594 行,再 594 处再添加一个断点(条件 Thread.currentThread().getName().equals("Thread-0"))
这时可以在 Variables 面板观察到 e 和 next 变量,使用view as -> Object
查看节点状态
1 2 e (1 )->(35 )->(16 )->null next (35 ) ->(16 )->null
在 Threads 面板选中 Thread-1 恢复运行,可以看到控制台输出新的内容如下,Thread-1 扩容已完成
1 newTable[1 ] (35 )->(1 )->null
这时 Thread-0 还停在 594 处, Variables 面板变量的状态已经变化为
1 2 e (1 )->null next (35 ) ->(1 )->null
为什么呢,因为 Thread-1 扩容时链表也是后加入的元素放入链表头,因此链表就倒过来了,但 Thread-1 虽然结 果正确,但它结束后 Thread-0 还要继续运行
接下来就可以单步调试(F8)观察死链的产生了
下一轮循环到 594,将 e 搬迁到 newTable 链表头
1 2 3 newTable[1 ] (1 )->null e (35 ) ->(1 )->null next (1 ) ->null
下一轮循环到 594,将 e 搬迁到 newTable 链表头
1 2 3 newTable[1 ] (35 )->(1 )->null e (1 ) ->null next null
再看看源码
1 2 3 4 5 6 7 e.next = newTable[1 ]; newTable[1 ] = e; e = next;
源码分析
HashMap 的并发死链发生在扩容时
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void transfer (Entry[] newTable, boolean rehash) { int newCapacity = newTable.length; for (Entry<K,V> e : table) { while (null != e) { Entry<K,V> next = e.next; if (rehash) { e.hash = null == e.key ? 0 : hash(e.key); } int i = indexFor(e.hash, newCapacity); e.next = newTable[i]; newTable[i] = e; e = next; } } }
假设 map 中初始元素是
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 原始链表,格式:[下标] (key,next) [1 ] (1 ,35 )->(35 ,16 )->(16 ,null ) 线程 a 执行到 1 处 ,此时局部变量 e 为 (1 ,35 ),而局部变量 next 为 (35 ,16 ) 线程 a 挂起 线程 b 开始执行 第一次循环 [1 ] (1 ,null ) 第二次循环 [1 ] (35 ,1 )->(1 ,null ) 第三次循环 [1 ] (35 ,1 )->(1 ,null ) [17 ] (16 ,null ) 切换回线程 a,此时局部变量 e 和 next 被恢复,引用没变但内容变了:e 的内容被改为 (1 ,null ),而 next 的内 容被改为 (35 ,1 ) 并链向 (1 ,null ) 第一次循环 [1 ] (1 ,null ) 第二次循环,注意这时 e 是 (35 ,1 ) 并链向 (1 ,null ) 所以 next 又是 (1 ,null ) [1 ] (35 ,1 )->(1 ,null ) 第三次循环,e 是 (1 ,null ),而 next 是 null ,但 e 被放入链表头,这样 e.next 变成了 35 (2 处) [1 ] (1 ,35 )->(35 ,1 )->(1 ,35 ) 已经是死链了
小结
究其原因,是因为在多线程环境下使用了非线程安全的 map 集合 JDK 8 虽然将扩容算法做了调整,不再将元素加入链表头(而是保持与扩容前一样的顺序),但仍不意味着能 够在多线程环境下能够安全扩容,还会出现其它问题(如扩容丢数据) 2)JDK 8 ConcurrentHashMap 重要属性和内部类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private transient volatile int sizeCtl;static class Node <K,V> implements Map .Entry<K,V> {}transient volatile Node<K,V>[] table;private transient volatile Node<K,V>[] nextTable;static final class ForwardingNode <K,V> extends Node <K,V> {}static final class ReservationNode <K,V> extends Node <K,V> {}static final class TreeBin <K,V> extends Node <K,V> {}static final class TreeNode <K,V> extends Node <K,V> {}
重要方法 1 2 3 4 5 6 7 8 static final <K,V> Node<K,V> tabAt (Node<K,V>[] tab, int i) static final <K,V> boolean casTabAt (Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) static final <K,V> void setTabAt (Node<K,V>[] tab, int i, Node<K,V> v)
构造器源码分析 可以看到实现了懒惰初始化,在构造方法中仅仅计算了 table 的大小,以后在第一次使用时才会真正创建
1 2 3 4 5 6 7 8 9 10 11 public ConcurrentHashMap (int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f ) || initialCapacity < 0 || concurrencyLevel <= 0 ) throw new IllegalArgumentException (); if (initialCapacity < concurrencyLevel) initialCapacity = concurrencyLevel; long size = (long )(1.0 + (long )initialCapacity / loadFactor); int cap = (size >= (long )MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int )size); this .sizeCtl = cap; }
get流程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public V get (Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1 ) & h)) != null ) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0 ) return (p = e.find(h, key)) != null ? p.val : null ; while ((e = e.next) != null ) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null ; }
总结:
如果table不为空且长度大于0且索引位置有元素if 头节点key的hash值相等 else if 头节点的hash为负数(bin在扩容或者是treebin) 进入循环(e不为空):节点key的hash值相等,且key指向同一个地址或equals 返回null put 流程 以下数组简称(table),链表简称(bin)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 public V put (K key, V value) { return putVal(key, value, false ); }final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException (); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new Node <K,V>(hash, key, value, null ))) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { binCount = 1 ; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break ; } Node<K,V> pred = e; if ((e = e.next) == null ) { pred.next = new Node <K,V>(hash, key, value, null ); break ; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2 ; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null ) return oldVal; break ; } } } addCount(1L , binCount); return null ; }private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0 ) { if ((sc = sizeCtl) < 0 ) Thread.yield(); else if (U.compareAndSwapInt(this , SIZECTL, sc, -1 )) { try { if ((tab = table) == null || tab.length == 0 ) { int n = (sc > 0 ) ? sc : DEFAULT_CAPACITY; Node<K,V>[] nt = (Node<K,V>[])new Node <?,?>[n]; table = tab = nt; sc = n - (n >>> 2 ); } } finally { sizeCtl = sc; } break ; } } return tab; }private final void addCount (long x, int check) { CounterCell[] as; long b, s; if ( (as = counterCells) != null || !U.compareAndSwapLong(this , BASECOUNT, b = baseCount, s = b + x) ) { CounterCell a; long v; int m; boolean uncontended = true ; if ( as == null || (m = as.length - 1 ) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) ) { fullAddCount(x, uncontended); return ; } if (check <= 1 ) return ; s = sumCount(); } if (check >= 0 ) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long )(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0 ) break ; if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) transfer(tab, nt); } else if (U.compareAndSwapInt(this , SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2 )) transfer(tab, null ); s = sumCount(); } } }
总结:
进入for循环:if table为null或者长度 为0 else if 索引处无节点创建节点,填入key和value,放入table,退出循环 else if 索引处节点的hash值为MOVE(ForwardingNode),表示正在扩容和迁移 else锁住头节点if 再次确认头节点没有被移动if 头节点hash值大于0(表示这是一个链表) else if 节点为红黑树节点调用putTreeVal
查看是否有对应key的数节点如果有且为覆盖模式,将值覆盖,返回旧值 如果没有,创建并插入,返回null 解锁 if binCount不为0如果binCount大于树化阈值8 如果旧值不为null break 增加size计数 return null size 计算流程 size 计算实际发生在 put,remove 改变集合元素的操作之中
没有竞争发生,向 baseCount 累加计数 有竞争发生,新建 counterCells,向其中的一个 cell 累加计counterCells 初始有两个 cell 如果计数竞争比较激烈,会创建新的 cell 来累加计数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public int size () { long n = sumCount(); return ((n < 0L ) ? 0 : (n > (long )Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int )n); }final long sumCount () { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null ) { for (int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; }
总结 Java 8 数组(Node) +( 链表 Node | 红黑树 TreeNode ) 以下数组简称(table),链表简称(bin)
初始化,使用 cas 来保证并发安全,懒惰初始化 table 树化,当 table.length < 64 时,先尝试扩容,超过 64 时,并且 bin.length > 8 时,会将链表树化,树化过程 会用 synchronized 锁住链表头 put,如果该 bin 尚未创建,只需要使用 cas 创建 bin;如果已经有了,锁住链表头进行后续 put 操作,元素 添加至 bin 的尾部 get,无锁操作仅需要保证可见性,扩容过程中 get 操作拿到的是 ForwardingNode 它会让 get 操作在新 table 进行搜索 扩容,扩容时以 bin 为单位进行,需要对 bin 进行 synchronized,但这时妙的是其它竞争线程也不是无事可 做,它们会帮助把其它 bin 进行扩容,扩容时平均只有 1/6 的节点会把复制到新 table 中 size,元素个数保存在 baseCount 中,并发时的个数变动保存在 CounterCell[] 当中。最后统计数量时累加 即可 源码分析 http://www.importnew.com/28263.html
其它实现 Cliff Click's high scale lib
3)JDK 7 ConcurrentHashMap 它维护了一个 segment 数组,每个 segment 对应一把锁
优点:如果多个线程访问不同的 segment,实际是没有冲突的,这与 jdk8 中是类似的 缺点:Segments 数组默认大小为16,这个容量初始化指定后就不能改变了,并且不是懒惰初始化 构造器分析 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public ConcurrentHashMap (int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0 ) || initialCapacity < 0 || concurrencyLevel <= 0 ) throw new IllegalArgumentException (); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; int sshift = 0 ; int ssize = 1 ; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1 ; } this .segmentShift = 32 - sshift; this .segmentMask = ssize - 1 ; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1 ; Segment<K,V> s0 = new Segment <K,V>(loadFactor, (int )(cap * loadFactor), (HashEntry<K,V>[])new HashEntry [cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment [ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); this .segments = ss; }
构造完成,如下图所示
可以看到 ConcurrentHashMap 没有实现懒惰初始化,空间占用不友好
其中 this.segmentShift 和 this.segmentMask 的作用是决定将 key 的 hash 结果匹配到哪个 segment
例如,根据某一 hash 值求 segment 位置,先将高位向低位移动 this.segmentShift 位
结果再与 this.segmentMask 做位于运算,最终得到 1010 即下标为 10 的 segment
put 流程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public V put (K key, V value) { Segment<K,V> s; if (value == null ) throw new NullPointerException (); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null ) { s = ensureSegment(j); } return s.put(key, hash, value, false ); }
segment 继承了可重入锁(ReentrantLock),它的 put 方法为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 final V put (K key, int hash, V value, boolean onlyIfAbsent) { HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1 ) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null ) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break ; } e = e.next; } else { if (node != null ) node.setNext(first); else node = new HashEntry <K,V>(hash, key, value, first); int c = count + 1 ; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null ; break ; } } } finally { unlock(); } return oldValue; }
rehash 流程 发生在 put 中,因为此时已经获得了锁,因此 rehash 时不需要考虑线程安全
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 private void rehash (HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1 ; threshold = (int )(newCapacity * loadFactor); HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry [newCapacity]; int sizeMask = newCapacity - 1 ; for (int i = 0 ; i < oldCapacity ; i++) { HashEntry<K,V> e = oldTable[i]; if (e != null ) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; if (next == null ) newTable[idx] = e; else { HashEntry<K,V> lastRun = e; int lastIdx = idx; for (HashEntry<K,V> last = next; last != null ; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry <K,V>(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
附,调试代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public static void main (String[] args) { ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap <>(); for (int i = 0 ; i < 1000 ; i++) { int hash = hash(i); int segmentIndex = (hash >>> 28 ) & 15 ; if (segmentIndex == 4 && hash % 8 == 2 ) { System.out.println(i + "\t" + segmentIndex + "\t" + hash % 2 + "\t" + hash % 4 + "\t" + hash % 8 ); } } map.put(1 , "value" ); map.put(15 , "value" ); map.put(169 , "value" ); map.put(197 , "value" ); map.put(341 , "value" ); map.put(484 , "value" ); map.put(545 , "value" ); map.put(912 , "value" ); map.put(941 , "value" ); System.out.println("ok" ); }private static int hash (Object k) { int h = 0 ; if ((0 != h) && (k instanceof String)) { return sun.misc.Hashing.stringHash32((String) k); } h ^= k.hashCode(); h += (h << 15 ) ^ 0xffffcd7d ; h ^= (h >>> 10 ); h += (h << 3 ); h ^= (h >>> 6 ); h += (h << 2 ) + (h << 14 ); int v = h ^ (h >>> 16 ); return v; }
get 流程 get 时并未加锁,用了 UNSAFE 方法保证了可见性,扩容过程中,get 先发生就从旧表取内容,get 后发生就从新 表取内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public V get (Object key) { Segment<K,V> s; HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null ) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long )(((tab.length - 1 ) & h)) << TSHIFT) + TBASE); e != null ; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null ; }
size 计算流程 计算元素个数前,先不加锁计算两次,如果前后两次结果如一样,认为个数正确返回 如果不一样,进行重试,重试次数超过 3,将所有 segment 锁住,重新计算个数返回 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public int size () { final Segment<K,V>[] segments = this .segments; int size; boolean overflow; long sum; long last = 0L ; int retries = -1 ; try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0 ; j < segments.length; ++j) ensureSegment(j).lock(); } sum = 0L ; size = 0 ; overflow = false ; for (int j = 0 ; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null ) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0 ) overflow = true ; } } if (sum == last) break ; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0 ; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }
8.8 BlockingQueue 8.8.1 * LinkedBlockingQueue 原理 基本的入队出队 1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class LinkedBlockingQueue <E> extends AbstractQueue <E> implements BlockingQueue <E>, java.io.Serializable { static class Node <E> { E item; Node<E> next; Node(E x) { item = x; } } }
初始化链表
last = head = new Node(null);
Dummy 节点用来占位,item 为 null
当一个节点入队
last = last.next = node;
再来一个节点入队last = last.next = node;
出队
1 2 3 4 5 6 7 8 9 10 11 Node<E> h = head; Node<E> first = h.next; h.next = h; head = first;E x = first.item; first.item = null ;return x;
h = head
first = h.next
h.next = h
head = first
1 2 3 E x = first.item; first.item = null ;return x;
加锁分析 高明之处 在于用了两把锁和 dummy 节点
用一把锁,同一时刻,最多只允许有一个线程(生产者或消费者,二选一)执行 用两把锁,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行消费者与消费者线程仍然串行 生产者与生产者线程仍然串行 线程安全分析
当节点总数大于 2 时(包括 dummy 节点),putLock 保证的是 last 节点的线程安全,takeLock 保证的是 head 节点的线程安全。两把锁保证了入队和出队没有竞争 当节点总数等于 2 时(即一个 dummy 节点,一个正常节点)这时候,仍然是两把锁锁两个对象,不会竞争 当节点总数等于 1 时(就一个 dummy 节点)这时 take 线程会被 notEmpty 条件阻塞,有竞争,会阻塞 1 2 3 4 private final ReentrantLock putLock = new ReentrantLock ();private final ReentrantLock takeLock = new ReentrantLock ();
put 操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public void put (E e) throws InterruptedException { if (e == null ) throw new NullPointerException (); int c = -1 ; Node<E> node = new Node <E>(e); final ReentrantLock putLock = this .putLock; final AtomicInteger count = this .count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); }
take 操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public E take () throws InterruptedException { E x; int c = -1 ; final AtomicInteger count = this .count; final ReentrantLock takeLock = this .takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0 ) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull() return x; }
由 put 唤醒 put 是为了避免信号不足
性能比较 主要列举 LinkedBlockingQueue 与 ArrayBlockingQueue 的性能比较
Linked 支持有界,Array 强制有界 Linked 实现是链表,Array 实现是数组 Linked 是懒惰的,而 Array 需要提前初始化 Node 数组 Linked 每次入队会生成新 Node,而 Array 的 Node 是提前创建好的 Linked 两把锁,Array 一把锁 8.9 ConcurrentLinkedQueue ConcurrentLinkedQueue 的设计与 LinkedBlockingQueue 非常像,也是
两把【锁】,同一时刻,可以允许两个线程同时(一个生产者与一个消费者)执行 dummy 节点的引入让两把【锁】将来锁住的是不同对象,避免竞争 只是这【锁】使用了 cas 来实现 事实上,ConcurrentLinkedQueue 应用还是非常广泛的
例如之前讲的 Tomcat 的 Connector 结构时,Acceptor 作为生产者向 Poller 消费者传递事件信息时,正是采用了 ConcurrentLinkedQueue 将 SocketChannel 给 Poller 使用
*ConcurrentLinkedQueue 原理 模仿 ConcurrentLinkedQueue 初始代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 package cn.itcast.concurrent.thirdpart.test;import java.util.Collection;import java.util.Iterator;import java.util.Queue;import java.util.concurrent.atomic.AtomicReference;public class Test3 { public static void main (String[] args) { MyQueue<String> queue = new MyQueue <>(); queue.offer("1" ); queue.offer("2" ); queue.offer("3" ); System.out.println(queue); } }class MyQueue <E> implements Queue <E> { @Override public String toString () { StringBuilder sb = new StringBuilder (); for (Node<E> p = head; p != null ; p = p.next.get()) { E item = p.item; if (item != null ) { sb.append(item).append("->" ); } } sb.append("null" ); return sb.toString(); } @Override public int size () { return 0 ; } @Override public boolean isEmpty () { return false ; } @Override public boolean contains (Object o) { return false ; } @Override public Iterator<E> iterator () { return null ; } @Override public Object[] toArray() { return new Object [0 ]; } @Override public <T> T[] toArray(T[] a) { return null ; } @Override public boolean add (E e) { return false ; } @Override public boolean remove (Object o) { return false ; } @Override public boolean containsAll (Collection<?> c) { return false ; } @Override public boolean addAll (Collection<? extends E> c) { return false ; } @Override public boolean removeAll (Collection<?> c) { return false ; } @Override public boolean retainAll (Collection<?> c) { return false ; } @Override public void clear () { } @Override public E remove () { return null ; } @Override public E element () { return null ; } @Override public E peek () { return null ; } public MyQueue () { head = last = new Node <>(null , null ); } private volatile Node<E> last; private volatile Node<E> head; private E dequeue () { return null ; } @Override public E poll () { return null ; } @Override public boolean offer (E e) { return true ; } static class Node <E> { volatile E item; public Node (E item, Node<E> next) { this .item = item; this .next = new AtomicReference <>(next); } AtomicReference<Node<E>> next; } }
offer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public boolean offer (E e) { Node<E> n = new Node <>(e, null ); while (true ) { AtomicReference<Node<E>> next = last.next; if (next.compareAndSet(null , n)) { last = n; return true ; } } }
8.10 CopyOnWriteArrayList CopyOnWriteArraySet
是它的马甲 底层实现采用了 写入时拷贝 的思想,增删改操作会将底层数组拷贝一份,更 改操作在新数组上执行,这时不影响其它线程的并发读 ,读写分离 。 以新增为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public boolean add (E e) { synchronized (lock) { Object[] es = getArray(); int len = es.length; es = Arrays.copyOf(es, len + 1 ); es[len] = e; setArray(es); return true ; } }
这里的源码版本是 Java 11,在 Java 1.8 中使用的是可重入锁而不是 synchronized
其它读操作并未加锁,例如:
1 2 3 4 5 6 7 public void forEach (Consumer<? super E> action) { Objects.requireNonNull(action); for (Object x : getArray()) { @SuppressWarnings("unchecked") E e = (E) x; action.accept(e); } }
适合『读多写少』的应用场景
get 弱一致性
1 Thread-0 getArray() 2 Thread-1 getArray() 3 Thread-1 setArray(arrayCopy) 4 Thread-0 array[index]
不容易测试,但问题确实存在
迭代器弱一致性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList <>(); list.add(1 ); list.add(2 ); list.add(3 ); Iterator<Integer> iter = list.iterator();new Thread (() -> { list.remove(0 ); System.out.println(list); }).start(); sleep1s();while (iter.hasNext()) { System.out.println(iter.next()); }
不要觉得弱一致性就不好
数据库的 MVCC 都是弱一致性的表现 并发高和一致性是矛盾的,需要权衡