J.U.C并发包提供了几个非常有用的、用于并发流程控制的CountDownLatch、CyclicBarrier、Semaphore、类等。
1,CountDownLatch,闭锁。实现类似计数器的功能。CountDownLatch的常用API如下:
CountDownLatch(int count) // 构造方法,接受一个int类型参数表示总计数void await() throws InterruptedException // 阻塞当前线程,直到计数=0,或者线程被中断boolean await(long timeout, TimeUnit unit) throws InterruptedException // 一段时间内阻塞当前线程。如果计数=0被唤醒则返回true,如果超时被唤醒则返回falsevoid countDown() // 将计数-1long getCount() // 获取当前计数值,常用于debug
举个例子。10个运动员比赛百米赛跑,只有这10个运动员都准备好之后,发令员才能发枪。
public class MyTest4CountDownLatch { public static void main(String[] args) { int playerNumbs = 10; // 运动员数量 CountDownLatch cdl = new CountDownLatch(playerNumbs); // 创建一个CountDownLatch对象(cdl的初始计数=10) Thread starter = new Starter(cdl); // 创建一个发令员 Listplayers = new ArrayList<>(playerNumbs); // 创建10个运动员(放到一个list集合中方便操作) for (int i = 0; i < playerNumbs; i++) { Thread player = new Player(i, cdl); players.add(player); } starter.start(); for (int i = 0; i < playerNumbs; i++) { players.get(i).start(); } } }class Player extends Thread{ private CountDownLatch cdl; private int number; Player(int number, CountDownLatch cdl){ this.cdl = cdl; this.number = number; } @Override public void run() { try { Thread.sleep((long) (Math.random() * 100)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("运动员" + number + "准备好了."); cdl.countDown(); // 该运动员准备就绪(cdl的计数-1) }}class Starter extends Thread { private CountDownLatch cdl; Starter(CountDownLatch cdl){ this.cdl = cdl; } @Override public void run() { System.out.println("发令员举起手枪,等待所有运动员准备就绪."); try { cdl.await(); // 等待所有运动员准备就绪(等待cdl的计数=0) } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("所有运动员已准备就绪,发令员发枪。"); }}
2,CyclicBarrier,循环栅栏。等待线程的数量达到目标的时候,所有等待的线程同时执行。可重置。CyclicBarrier的常用API如下:
CyclicBarrier(int parties) // 构造方法,指明需要等待的计数CyclicBarrier(int parties, Runnable barrierAction) // 构造方法,指明需要等待的计数 和 计数=0时的触发操作int getParties() // 返回初始化指定的partiesint await() throws InterruptedException, BrokenBarrierException // 阻塞当前线程。返回剩余等待计数。如果返回paties-1则表示是第1个到达。如果返回0则表示最后一个到达int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException // 一段时间内阻塞等待。响应线程中断标识。返回值同await()boolean isBroken() // 如果当前cyclicBarrier对象处于broken状态则返回truevoid reset() // 重置计数int getNumberWaiting() // 返回剩余计数
CyclicBarrier是一个所有线程要么全通过,要么全不通过的工具。如果有线程调用await(long timeout, TimeUnit unit) 超时通过,则CyclicBarrier处于broken状态,其他别的正在等待的线程会收到InterruptedException, 后续调用await()的线程会收到BrokenBarrierException。
举个例子。地铁的修建是按分段来修建的,只有所有的分段都施工完成之后,才可以通车。
public class MyTest4CyclicBarrier { public static void main(String[] args) throws InterruptedException { int lineSegementNumbs = 3; CyclicBarrier cb = new CyclicBarrier(lineSegementNumbs); // 创建CyclicBarrier对象 (cb计数=3) for (int i = 0; i < lineSegementNumbs; i++) { new LineSegment(i, cb).start(); } Thread.sleep(1000); System.out.println("CyclicBarrier重用"); // 自动重置计数 for (int i = 0; i < lineSegementNumbs; i++) { new LineSegment(i, cb).start(); } }}class LineSegment extends Thread{ private int segementNum; private CyclicBarrier cb; LineSegment(int segementNum, CyclicBarrier cb){ this.segementNum = segementNum; this.cb = cb; } @Override public void run() { try { Thread.sleep((long) (Math.random() * 100)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线路段" + segementNum +"已施工完成."); try { cb.await(); // cb计数-1,等待cb计数=0 } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println("通车了..."); } }
3,Semaphore,信号量。通常用来表示许可数量,用于限制可以访问某些资源(物理或逻辑的)的线程数目。Semaphore的常用API如下:
Semaphore(int permits) // 构造方法,指定许可数量,默认非公平Semaphore(int permits, boolean fair) // 构造方法,指定许可数量,制定是否公平获取void acquire() throws InterruptedException // 阻塞等待许可。阻塞期间响应线程的中断标识void acquireUninterruptibly() // 阻塞等待许可,不相应线程中断标识。boolean tryAcquire() // 尝试获取许可。如果失败则返回false,如果获取成功则返回trueboolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException // 一段时间内阻塞等待许可,阻塞期间响应线程的中断标识void release() // 释放一个许可// 以下API中,int类型的permits参数表示一次获取或释放多个许可void acquire(int permits) throws InterruptedException // 同 acquire()void acquireUninterruptibly(int permits) // 同 acquireUninterruptibly()boolean tryAcquire(int permits) // 同 tryAcquire()boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException // 同 tryAcquire(long timeout, TimeUnit unit)void release(int permits) // 同 release()// 以下API常用于控制、监控或debugint availablePermits() // 剩余可用的许可数量int drainPermits() // 将所有剩余许可置为0int getQueueLength() // 等待获取许可的线程数量
举个例子。桥每次最多能通过两个人,每个人通过桥的时间为10秒,桥东西两侧各有10个人同时等待准备过桥。输出每次过桥人的姓名、过桥方向和过桥时间。
public class MyTest4Semaphore { public static void main(String[] args) throws InterruptedException { String[] gruopEast = {"张三0", "张三1", "张三2", "张三3", "张三4", "张三5", "张三6", "张三7", "张三8", "张三9"}; String[] gruopWest = {"李四0", "李四1", "李四2", "李四3", "李四4", "李四5", "李四6", "李四7", "李四8", "李四9"}; int takeTime = 10; CyclicBarrier cb = new CyclicBarrier(20); // 使用CyclicBarrier来达到“同时准备过桥”的目的。 Semaphore semaphore = new Semaphore(2); for (int i = 0; i < gruopEast.length; i++) { new Player(gruopEast[i], "西", takeTime, cb, semaphore).start(); } for (int i = 0; i < gruopWest.length; i++) { new Player(gruopWest[i], "东", takeTime, cb, semaphore).start(); } }}class Player extends Thread { private String name; private String destination; private int takeTimeSeconds; private CyclicBarrier cb; private Semaphore semaphore; public Player(String name, String destination, int takeTimeSeconds, CyclicBarrier cb, Semaphore semaphore) { super(); this.name = name; this.destination = destination; this.takeTimeSeconds = takeTimeSeconds; this.cb = cb; this.semaphore = semaphore; } @Override public void run(){ try { cb.await(); semaphore.acquire(); // 阻塞获取许可 System.out.println(name + "准备向" + destination + "过桥,需要花费" + takeTimeSeconds + "秒"); Thread.sleep(1000 * takeTimeSeconds); semaphore.release(); // 释放许可 } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (BrokenBarrierException e) { // TODO Auto-generated catch block e.printStackTrace(); } }}
4,Phaser,JDK1.7新增,功能上比CyclicBarrier、CountDownLatch更强大,提供更为丰富的API。常用于多线程参与多阶段完成的场景。在不同阶段,可以等待不同数量人的人完成,再进入下一阶段。在进入下一个阶段的时候,用户还可以重写onAdvance()来实现更佳自定义更加灵活的场景。Phaser的常用API如下:
Phaser(int parties) // 构造方法,指明有多少个线程参与int register() // 当前线程注册到当前阶段int arrive() // 当前线程已完成自己的工作int arriveAndDeregister() // 当前线程已完成自己的工作,并取消注册(不参与下一个阶段)。int arriveAndAwaitAdvance() // 当前线程已完成自己的工作,等待进入一下个阶段(参与下一个阶段)int awaitAdvance(int phase) // 阻塞等待第phase个阶段int awaitAdvanceInterruptibly(int phase) // 阻塞等待第phase个阶段,阻塞期间响应线程中断标识int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) // 阻塞等待第phase个阶段一段时间,阻塞期间响应线程中断标识void forceTermination() // 强制将该phaser对象设置为中指状态int getPhase() // 获取当前第phase个阶段int getRegisteredParties() // 获取当前阶段的注册数量int getArrivedParties() // 获取当前阶段已经完成的线程数量int getUnarrivedParties() // 获取当前阶段还未完成的线程数量boolean isTerminated() // 判断该phaser是否可用boolean onAdvance(int phase, int registeredParties) // protected方法,当一个阶段完成后触发,进入下一个阶段之前的动作。
每个Phaser实例都会维护一个phase number,初始值为0。每当所有注册的任务都到达Phaser时,phase number累加,并在超过Integer.MAX_VALUE后清零。arrive()和arriveAndDeregister()方法用于记录到达;其中arrive(),某个参与者完成任务后调用;arriveAndDeregister(),任务完成,取消自己的注册。arriveAndAwaitAdvance(),自己完成等待其他参与者完成,进入阻塞,直到Phaser成功进入下个阶段。
举个例子。有一个项目,分为4个阶段完成。第一个阶段有工人A,工人B,工人C共同参与。第二阶段由工人C、工人D共同完成,第三阶段由工人B、工人E共同完成,第四阶段由工人A、B、C共同完成。前一个阶段完成之后,才能进入下一个阶段的工作。每个阶段完成之后,先向经理汇报,然后进入下一阶段。
public class MyTest4Phaser { public static void main(String[] args) { Phaser phaser = new MyProject(5); int[] workerAPhaseArray = new int[] {0, 3}; int[] workerBPhaseArray = new int[] {0, 2, 3}; int[] workerCPhaseArray = new int[] {0, 1, 3}; int[] workerDPhaseArray = new int[] {1}; int[] workerEPhaseArray = new int[] {2}; int phaseAmount = 4; Worker workerA = new Worker("A", workerAPhaseArray, phaser, phaseAmount); Worker workerB = new Worker("B", workerBPhaseArray, phaser, phaseAmount); Worker workerC = new Worker("C", workerCPhaseArray, phaser, phaseAmount); Worker workerD = new Worker("D", workerDPhaseArray, phaser, phaseAmount); Worker workerE = new Worker("E", workerEPhaseArray, phaser, phaseAmount); workerA.start(); workerB.start(); workerC.start(); workerD.start(); workerE.start(); }}class MyProject extends Phaser { public MyProject(int parties){ super(parties); } @Override public boolean onAdvance(int currenPphase, int registeredParties) { if (currenPphase < 3) { System.out.println("通知经理:已经完成了第" + currenPphase + "阶段任务,准备执行第" + (currenPphase + 1) + "阶段任务。"); }else { System.out.println("通知经理:已完成了所有任务"); } return registeredParties == 0; }}class Worker extends Thread { protected String name; protected Phaser phaser; private int[] phaseArray; private int phaseAmount; public Worker (String name,int[] phaseArray, Phaser phaser, int phaseAmount){ super(name); this.name = name; this.phaseArray = phaseArray; this.phaser = phaser; this.phaseAmount = phaseAmount; if (phaseArray == null || phaseArray.length == 0) throw new RuntimeException("工人参与的阶段错误"); } public void doWork(){ Setset = intArray2Set(phaseArray); int lastPhase = phaseArray[phaseArray.length-1]; for (int i = 0; i < phaseAmount && (!phaser.isTerminated()); i++) { int currentPhase = phaser.getPhase(); if (set.contains(currentPhase)) { outPrint(currentPhase); if (lastPhase == currentPhase) { phaser.arriveAndDeregister(); break; }else { phaser.arriveAndAwaitAdvance(); } }else{ phaser.arriveAndAwaitAdvance(); } } } public Set intArray2Set(int[] phaseArray){ return Arrays.stream(phaseArray).boxed().collect(Collectors.toSet()); } protected void outPrint(int i){ try { System.out.println("工人" + name + "正在执行第" + i + "阶段任务"); Thread.sleep(1000); System.out.println("工人" + name + "执行完成第" + i + "阶段任务"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void run() { doWork(); }}
5,Exchanger<V>,线程间的数据交换器。两个线程在一个安全点彼此交换数据。该类比较简单,就3个API:
public Exchanger() // V exchange(V x) throws InterruptedException // V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException //
第一个调用exchange()的线程会阻塞等待,直到第二个线程调用exchange()来完成彼此数据的交换。
举个例子。飞机驾驶员有主飞和副飞,重要消息需要二者互相确认的。
public class MyTest4Exchanger { public static void main(String[] args) { Exchangerexchanger = new Exchanger<>(); Thread primary = new Pilot("主飞", "10分钟后降落", exchanger); Thread secondary = new Pilot("副飞", "10分钟后降落", exchanger); primary.start(); secondary.start(); }}class Pilot extends Thread { private String pilotName; private String receivedMsg; private Exchanger exchanger; Pilot(String pilotName, String receivedMsg, Exchanger exchanger){ this.pilotName = pilotName; this.receivedMsg = receivedMsg; this.exchanger = exchanger; } @Override public void run() { String ownReceivedMsg = receivedMsg; String otherReceivedMsg = null; try { otherReceivedMsg = exchanger.exchange(ownReceivedMsg); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(pilotName + "汇报:" + (ownReceivedMsg.equals(otherReceivedMsg) ? "消息一致." : "不消息一致.")); } }
以上介绍的5个工具类均为线程间的互相通讯的工具类。还有一个线程私有的工具类,ThreadLocal。不过,ThreadLocal是存在于java.lang包下的。
6,ThreadLocal<T>,本地线程变量。ThreadLocal为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。ThreadLocal的API也比较简单:
ThreadLocal() // 构造方法T get() // 获取当前线程中存储的本地变量void set(T value) // 将value设置到当前线程的本地变量中存储void remove() // 删除当前线程中存储的本地变量
举个例子。在多数据源处理中,以读写分离为例,可以将数据源的标识放到ThreadLocal中,使用aop来自动完成切换工作。本例就简单的模拟一下。(例子中不采用AOP了,直接代码中体现)
public class MyTest4ThreadLocal { public static void main(String[] args) { Mapname2DataSource = new HashMap<>(); // 缓存每个datasource,value以String代替 name2DataSource.put("read", "ReadDataBase"); name2DataSource.put("write", "WriteDataBase"); for(int i = 0; i < 10; i++){ new BussinessThread(i, name2DataSource).start(); } }}class DataSourceHolder { private static final ThreadLocal dataSources = new ThreadLocal<>(); public static void setDataSourceKey(String customType) { dataSources.set(customType); } public static String getDataSourceKey() { return (String) dataSources.get(); } public static void clearDataSourceKey() { dataSources.remove(); }}class BussinessThread extends Thread { private Map name2DataSource; private int number; BussinessThread(int number, Map name2DataSource){ this.number = number; this.name2DataSource = name2DataSource; } @Override public void run() { System.out.println("业务线程" + number + "准备进行读操作"); DataSourceHolder.setDataSourceKey("read"); try { Thread.sleep((long)(Math.random()*1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("业务线程" + number + "读操作对应的数据库是:" + name2DataSource.get(DataSourceHolder.getDataSourceKey())); DataSourceHolder.clearDataSourceKey(); // 再测试写操作。 try { Thread.sleep((long)(Math.random()*1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("业务线程" + number + "准备进行写操作"); DataSourceHolder.setDataSourceKey("write"); try { Thread.sleep((long)(Math.random()*1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("业务线程" + number + "写操作对应的数据库是:" + name2DataSource.get(DataSourceHolder.getDataSourceKey())); DataSourceHolder.clearDataSourceKey(); }}
ThreadLocal在使用完之后一定要手动threadlocal.remove()。原因有二: 1,如果使用不当会造成内存泄漏。线程类Thread中都有一份ThreadLocalMap的变量用来存储线程本地变量。由于ThreadLocalMap的生命周期跟Thread一样长,如果使用完之后没有手动threadlocal.remove()删除则会产生内存泄漏。 2,使用线程池的使用,线程是反复利用的资源,回收前的线程的副本变量会可能对再次时造成影响。
so,正确使用ThreadLocal的姿势要注意两点: 1,ThreadLocal设置为类的静态变量。这样就只维持一份。 2,set(T value)设置,get()使用之后,一定要记得remove()删除。参考资料:
- 以上内容为笔者日常琐屑积累,已无从考究引用。如果有,请站内信提示。