博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
我理解的Java并发基础(五):并发工具类和ThreadLocal
阅读量:5873 次
发布时间:2019-06-19

本文共 15746 字,大约阅读时间需要 52 分钟。

hot3.png

J.U.C并发包提供了几个非常有用的、用于并发流程控制的CountDownLatch、CyclicBarrier、Semaphore、类等。

1,CountDownLatch,闭锁。实现类似计数器的功能。CountDownLatch的常用API如下:

CountDownLatch(int count) // 构造方法,接受一个int类型参数表示总计数void await() throws InterruptedException // 阻塞当前线程,直到计数=0,或者线程被中断boolean await(long timeout, TimeUnit unit) throws InterruptedException // 一段时间内阻塞当前线程。如果计数=0被唤醒则返回true,如果超时被唤醒则返回falsevoid countDown() // 将计数-1long getCount() // 获取当前计数值,常用于debug

举个例子。10个运动员比赛百米赛跑,只有这10个运动员都准备好之后,发令员才能发枪。

public class MyTest4CountDownLatch {        public static void main(String[] args) {        int playerNumbs = 10; // 运动员数量        CountDownLatch cdl = new CountDownLatch(playerNumbs); // 创建一个CountDownLatch对象(cdl的初始计数=10)                Thread starter = new Starter(cdl); // 创建一个发令员        List
players = new ArrayList<>(playerNumbs); // 创建10个运动员(放到一个list集合中方便操作) for (int i = 0; i < playerNumbs; i++) { Thread player = new Player(i, cdl); players.add(player); } starter.start(); for (int i = 0; i < playerNumbs; i++) { players.get(i).start(); } } }class Player extends Thread{ private CountDownLatch cdl; private int number; Player(int number, CountDownLatch cdl){ this.cdl = cdl; this.number = number; } @Override public void run() { try { Thread.sleep((long) (Math.random() * 100)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("运动员" + number + "准备好了."); cdl.countDown(); // 该运动员准备就绪(cdl的计数-1) }}class Starter extends Thread { private CountDownLatch cdl; Starter(CountDownLatch cdl){ this.cdl = cdl; } @Override public void run() { System.out.println("发令员举起手枪,等待所有运动员准备就绪."); try { cdl.await(); // 等待所有运动员准备就绪(等待cdl的计数=0) } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("所有运动员已准备就绪,发令员发枪。"); }}

2,CyclicBarrier,循环栅栏。等待线程的数量达到目标的时候,所有等待的线程同时执行。可重置。CyclicBarrier的常用API如下:

CyclicBarrier(int parties) // 构造方法,指明需要等待的计数CyclicBarrier(int parties, Runnable barrierAction) // 构造方法,指明需要等待的计数 和 计数=0时的触发操作int getParties() // 返回初始化指定的partiesint await() throws InterruptedException, BrokenBarrierException // 阻塞当前线程。返回剩余等待计数。如果返回paties-1则表示是第1个到达。如果返回0则表示最后一个到达int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException // 一段时间内阻塞等待。响应线程中断标识。返回值同await()boolean isBroken() // 如果当前cyclicBarrier对象处于broken状态则返回truevoid reset() // 重置计数int getNumberWaiting() // 返回剩余计数

  CyclicBarrier是一个所有线程要么全通过,要么全不通过的工具。如果有线程调用await(long timeout, TimeUnit unit) 超时通过,则CyclicBarrier处于broken状态,其他别的正在等待的线程会收到InterruptedException, 后续调用await()的线程会收到BrokenBarrierException。

举个例子。地铁的修建是按分段来修建的,只有所有的分段都施工完成之后,才可以通车。

public class MyTest4CyclicBarrier {    public static void main(String[] args) throws InterruptedException {        int lineSegementNumbs = 3;        CyclicBarrier cb = new CyclicBarrier(lineSegementNumbs); // 创建CyclicBarrier对象 (cb计数=3)                for (int i = 0; i < lineSegementNumbs; i++) {            new LineSegment(i, cb).start();        }                Thread.sleep(1000);        System.out.println("CyclicBarrier重用"); // 自动重置计数                for (int i = 0; i < lineSegementNumbs; i++) {            new LineSegment(i, cb).start();        }                    }}class LineSegment extends Thread{        private int segementNum;    private CyclicBarrier cb;        LineSegment(int segementNum, CyclicBarrier cb){        this.segementNum = segementNum;        this.cb = cb;    }            @Override    public void run() {        try {            Thread.sleep((long) (Math.random() * 100));        } catch (InterruptedException e) {            e.printStackTrace();        }                System.out.println("线路段" + segementNum +"已施工完成.");        try {            cb.await(); // cb计数-1,等待cb计数=0        } catch (InterruptedException | BrokenBarrierException e) {            e.printStackTrace();        }        System.out.println("通车了...");    }    }

3,Semaphore,信号量。通常用来表示许可数量,用于限制可以访问某些资源(物理或逻辑的)的线程数目。Semaphore的常用API如下:

Semaphore(int permits) // 构造方法,指定许可数量,默认非公平Semaphore(int permits, boolean fair) // 构造方法,指定许可数量,制定是否公平获取void acquire() throws InterruptedException // 阻塞等待许可。阻塞期间响应线程的中断标识void acquireUninterruptibly() // 阻塞等待许可,不相应线程中断标识。boolean tryAcquire() // 尝试获取许可。如果失败则返回false,如果获取成功则返回trueboolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException // 一段时间内阻塞等待许可,阻塞期间响应线程的中断标识void release() // 释放一个许可// 以下API中,int类型的permits参数表示一次获取或释放多个许可void acquire(int permits) throws InterruptedException // 同 acquire()void acquireUninterruptibly(int permits) // 同 acquireUninterruptibly()boolean tryAcquire(int permits) // 同 tryAcquire()boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException // 同 tryAcquire(long timeout, TimeUnit unit)void release(int permits) // 同 release()// 以下API常用于控制、监控或debugint availablePermits() // 剩余可用的许可数量int drainPermits() // 将所有剩余许可置为0int getQueueLength() // 等待获取许可的线程数量

举个例子。桥每次最多能通过两个人,每个人通过桥的时间为10秒,桥东西两侧各有10个人同时等待准备过桥。输出每次过桥人的姓名、过桥方向和过桥时间。

public class MyTest4Semaphore {    public static void main(String[] args) throws InterruptedException {        String[] gruopEast = {"张三0", "张三1", "张三2", "张三3", "张三4", "张三5", "张三6", "张三7", "张三8", "张三9"};        String[] gruopWest = {"李四0", "李四1", "李四2", "李四3", "李四4", "李四5", "李四6", "李四7", "李四8", "李四9"};        int takeTime = 10;        CyclicBarrier cb = new CyclicBarrier(20); // 使用CyclicBarrier来达到“同时准备过桥”的目的。        Semaphore semaphore = new Semaphore(2);        for (int i = 0; i < gruopEast.length; i++) {            new Player(gruopEast[i], "西", takeTime, cb, semaphore).start();        }        for (int i = 0; i < gruopWest.length; i++) {            new Player(gruopWest[i], "东", takeTime, cb, semaphore).start();        }    }}class Player extends Thread {    private String name;    private String destination;    private int takeTimeSeconds;    private CyclicBarrier cb;    private Semaphore semaphore;    public Player(String name, String destination, int takeTimeSeconds, CyclicBarrier cb, Semaphore semaphore) {        super();        this.name = name;        this.destination = destination;        this.takeTimeSeconds = takeTimeSeconds;        this.cb = cb;        this.semaphore = semaphore;    }    @Override    public void run(){        try {            cb.await();            semaphore.acquire(); // 阻塞获取许可            System.out.println(name + "准备向" + destination + "过桥,需要花费" + takeTimeSeconds + "秒");            Thread.sleep(1000 * takeTimeSeconds);            semaphore.release(); // 释放许可        } catch (InterruptedException e) {            // TODO Auto-generated catch block            e.printStackTrace();        } catch (BrokenBarrierException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }    }}

4,Phaser,JDK1.7新增,功能上比CyclicBarrier、CountDownLatch更强大,提供更为丰富的API。常用于多线程参与多阶段完成的场景。在不同阶段,可以等待不同数量人的人完成,再进入下一阶段。在进入下一个阶段的时候,用户还可以重写onAdvance()来实现更佳自定义更加灵活的场景。Phaser的常用API如下:

Phaser(int parties) // 构造方法,指明有多少个线程参与int register() // 当前线程注册到当前阶段int arrive() // 当前线程已完成自己的工作int arriveAndDeregister() // 当前线程已完成自己的工作,并取消注册(不参与下一个阶段)。int arriveAndAwaitAdvance()  // 当前线程已完成自己的工作,等待进入一下个阶段(参与下一个阶段)int awaitAdvance(int phase) // 阻塞等待第phase个阶段int awaitAdvanceInterruptibly(int phase) //  阻塞等待第phase个阶段,阻塞期间响应线程中断标识int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit) // 阻塞等待第phase个阶段一段时间,阻塞期间响应线程中断标识void forceTermination() // 强制将该phaser对象设置为中指状态int getPhase() // 获取当前第phase个阶段int getRegisteredParties() // 获取当前阶段的注册数量int getArrivedParties() // 获取当前阶段已经完成的线程数量int getUnarrivedParties() // 获取当前阶段还未完成的线程数量boolean isTerminated() // 判断该phaser是否可用boolean onAdvance(int phase, int registeredParties) // protected方法,当一个阶段完成后触发,进入下一个阶段之前的动作。

  每个Phaser实例都会维护一个phase number,初始值为0。每当所有注册的任务都到达Phaser时,phase number累加,并在超过Integer.MAX_VALUE后清零。arrive()和arriveAndDeregister()方法用于记录到达;其中arrive(),某个参与者完成任务后调用;arriveAndDeregister(),任务完成,取消自己的注册。arriveAndAwaitAdvance(),自己完成等待其他参与者完成,进入阻塞,直到Phaser成功进入下个阶段。

举个例子。有一个项目,分为4个阶段完成。第一个阶段有工人A,工人B,工人C共同参与。第二阶段由工人C、工人D共同完成,第三阶段由工人B、工人E共同完成,第四阶段由工人A、B、C共同完成。前一个阶段完成之后,才能进入下一个阶段的工作。每个阶段完成之后,先向经理汇报,然后进入下一阶段。

public class MyTest4Phaser {    public static void main(String[] args) {        Phaser phaser = new MyProject(5);        int[] workerAPhaseArray = new int[] {0, 3};        int[] workerBPhaseArray = new int[] {0, 2, 3};        int[] workerCPhaseArray = new int[] {0, 1, 3};        int[] workerDPhaseArray = new int[] {1};        int[] workerEPhaseArray = new int[] {2};        int phaseAmount = 4;        Worker workerA = new Worker("A", workerAPhaseArray, phaser, phaseAmount);        Worker workerB = new Worker("B", workerBPhaseArray, phaser, phaseAmount);        Worker workerC = new Worker("C", workerCPhaseArray, phaser, phaseAmount);        Worker workerD = new Worker("D", workerDPhaseArray, phaser, phaseAmount);        Worker workerE = new Worker("E", workerEPhaseArray, phaser, phaseAmount);        workerA.start();        workerB.start();        workerC.start();        workerD.start();        workerE.start();    }}class MyProject extends Phaser {    public MyProject(int parties){        super(parties);    }    @Override    public boolean onAdvance(int currenPphase, int registeredParties) {        if (currenPphase < 3) {            System.out.println("通知经理:已经完成了第" + currenPphase + "阶段任务,准备执行第" + (currenPphase + 1) + "阶段任务。");        }else {            System.out.println("通知经理:已完成了所有任务");        }        return registeredParties == 0;    }}class Worker extends Thread {    protected String name;    protected Phaser phaser;    private int[] phaseArray;    private int phaseAmount;    public Worker (String name,int[] phaseArray, Phaser phaser, int phaseAmount){        super(name);        this.name = name;        this.phaseArray = phaseArray;        this.phaser = phaser;        this.phaseAmount = phaseAmount;        if (phaseArray == null || phaseArray.length == 0) throw new RuntimeException("工人参与的阶段错误");    }    public void doWork(){        Set
set = intArray2Set(phaseArray); int lastPhase = phaseArray[phaseArray.length-1]; for (int i = 0; i < phaseAmount && (!phaser.isTerminated()); i++) { int currentPhase = phaser.getPhase(); if (set.contains(currentPhase)) { outPrint(currentPhase); if (lastPhase == currentPhase) { phaser.arriveAndDeregister(); break; }else { phaser.arriveAndAwaitAdvance(); } }else{ phaser.arriveAndAwaitAdvance(); } } } public Set
intArray2Set(int[] phaseArray){ return Arrays.stream(phaseArray).boxed().collect(Collectors.toSet()); } protected void outPrint(int i){ try { System.out.println("工人" + name + "正在执行第" + i + "阶段任务"); Thread.sleep(1000); System.out.println("工人" + name + "执行完成第" + i + "阶段任务"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void run() { doWork(); }}

5,Exchanger<V>,线程间的数据交换器。两个线程在一个安全点彼此交换数据。该类比较简单,就3个API:

public Exchanger() // V exchange(V x) throws InterruptedException // V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException //

  第一个调用exchange()的线程会阻塞等待,直到第二个线程调用exchange()来完成彼此数据的交换。

举个例子。飞机驾驶员有主飞和副飞,重要消息需要二者互相确认的。

public class MyTest4Exchanger {    public static void main(String[] args) {                Exchanger
exchanger = new Exchanger<>(); Thread primary = new Pilot("主飞", "10分钟后降落", exchanger); Thread secondary = new Pilot("副飞", "10分钟后降落", exchanger); primary.start(); secondary.start(); }}class Pilot extends Thread { private String pilotName; private String receivedMsg; private Exchanger
exchanger; Pilot(String pilotName, String receivedMsg, Exchanger
exchanger){ this.pilotName = pilotName; this.receivedMsg = receivedMsg; this.exchanger = exchanger; } @Override public void run() { String ownReceivedMsg = receivedMsg; String otherReceivedMsg = null; try { otherReceivedMsg = exchanger.exchange(ownReceivedMsg); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(pilotName + "汇报:" + (ownReceivedMsg.equals(otherReceivedMsg) ? "消息一致." : "不消息一致.")); } }

  以上介绍的5个工具类均为线程间的互相通讯的工具类。还有一个线程私有的工具类,ThreadLocal。不过,ThreadLocal是存在于java.lang包下的。

6,ThreadLocal<T>,本地线程变量。ThreadLocal为每个使用该变量的线程提供独立的变量副本,所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。ThreadLocal的API也比较简单:

ThreadLocal() // 构造方法T get() // 获取当前线程中存储的本地变量void set(T value) // 将value设置到当前线程的本地变量中存储void remove() // 删除当前线程中存储的本地变量

举个例子。在多数据源处理中,以读写分离为例,可以将数据源的标识放到ThreadLocal中,使用aop来自动完成切换工作。本例就简单的模拟一下。(例子中不采用AOP了,直接代码中体现)

public class MyTest4ThreadLocal {        public static void main(String[] args) {                Map
name2DataSource = new HashMap<>(); // 缓存每个datasource,value以String代替 name2DataSource.put("read", "ReadDataBase"); name2DataSource.put("write", "WriteDataBase"); for(int i = 0; i < 10; i++){ new BussinessThread(i, name2DataSource).start(); } }}class DataSourceHolder { private static final ThreadLocal
dataSources = new ThreadLocal<>(); public static void setDataSourceKey(String customType) { dataSources.set(customType); } public static String getDataSourceKey() { return (String) dataSources.get(); } public static void clearDataSourceKey() { dataSources.remove(); }}class BussinessThread extends Thread { private Map
name2DataSource; private int number; BussinessThread(int number, Map
name2DataSource){ this.number = number; this.name2DataSource = name2DataSource; } @Override public void run() { System.out.println("业务线程" + number + "准备进行读操作"); DataSourceHolder.setDataSourceKey("read"); try { Thread.sleep((long)(Math.random()*1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("业务线程" + number + "读操作对应的数据库是:" + name2DataSource.get(DataSourceHolder.getDataSourceKey())); DataSourceHolder.clearDataSourceKey(); // 再测试写操作。 try { Thread.sleep((long)(Math.random()*1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("业务线程" + number + "准备进行写操作"); DataSourceHolder.setDataSourceKey("write"); try { Thread.sleep((long)(Math.random()*1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("业务线程" + number + "写操作对应的数据库是:" + name2DataSource.get(DataSourceHolder.getDataSourceKey())); DataSourceHolder.clearDataSourceKey(); }}

  ThreadLocal在使用完之后一定要手动threadlocal.remove()。原因有二: 1,如果使用不当会造成内存泄漏。线程类Thread中都有一份ThreadLocalMap的变量用来存储线程本地变量。由于ThreadLocalMap的生命周期跟Thread一样长,如果使用完之后没有手动threadlocal.remove()删除则会产生内存泄漏。 2,使用线程池的使用,线程是反复利用的资源,回收前的线程的副本变量会可能对再次时造成影响。

  so,正确使用ThreadLocal的姿势要注意两点: 1,ThreadLocal设置为类的静态变量。这样就只维持一份。 2,set(T value)设置,get()使用之后,一定要记得remove()删除。

参考资料:

  • 以上内容为笔者日常琐屑积累,已无从考究引用。如果有,请站内信提示。

转载于:https://my.oschina.net/u/3466682/blog/1632744

你可能感兴趣的文章
C/C++ 多线程机制
查看>>
js - object.assign 以及浅、深拷贝
查看>>
python mysql Connect Pool mysql连接池 (201
查看>>
Boost在vs2010下的配置
查看>>
android camera(四):camera 驱动 GT2005
查看>>
一起谈.NET技术,ASP.NET伪静态的实现及伪静态的意义
查看>>
20款绝佳的HTML5应用程序示例
查看>>
string::c_str()、string::c_data()及string与char *的正确转换
查看>>
11G数据的hive初测试
查看>>
如何使用Core Text计算一段文本绘制在屏幕上之后的高度
查看>>
==和equals区别
查看>>
2010技术应用计划
查看>>
XML 节点类型
查看>>
驯服 Tiger: 并发集合 超越 Map、Collection、List 和 Set
查看>>
Winform开发框架之权限管理系统改进的经验总结(3)-系统登录黑白名单的实现...
查看>>
Template Method Design Pattern in Java
查看>>
MVC输出字符串常用四个方式
查看>>
LeetCode – LRU Cache (Java)
查看>>
JavaScript高级程序设计--对象,数组(栈方法,队列方法,重排序方法,迭代方法)...
查看>>
【转】 学习ios(必看经典)牛人40天精通iOS开发的学习方法【2015.12.2
查看>>