当前位置:网站首页>Concurrency tool class - introduction and use of CountDownLatch, CyclicBarrier, Semaphore, Exchanger
Concurrency tool class - introduction and use of CountDownLatch, CyclicBarrier, Semaphore, Exchanger
2022-08-10 05:09:00 【Xiaoyu who loves programming】
CountDownLatch、CyclicBarrier、Semaphore、Exchanger
等待多线程完成的CountDownLatch
CountDownLatch类似于join,Is to make the current thread wait for other threads to end.
join的实现原理,Constantly check if the thread is alive.
while (isAlive()) {
wait(0);
}
CountDownLatch 与 join 的区别:
CountDownLatchThe waiting thread can be made to only wait for the completion of a certain step of the worker thread,There is no need to let the worker threads all finish executing.
实例:
public class CountDownLatchTest {
staticCountDownLatch c = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(1);
c.countDown();
System.out.println(2);
c.countDown();
}
}).start();
c.await();
System.out.println("3");
}
}
CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N.
当我们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零.由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤.用在多个线程时,只需要把这个 CountDownLatch的引用传递到线程里即可.
同步屏障CyclicBarrier
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier).
The effect is to let a 组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行.(游戏匹配)
CyclicBarrier简介
CyclicBarrier默认的构造方法是CyclicBarrier(int parties),Its parameter represents the barrier拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞.
实例:
public class CyclicBarrierTest {
staticCyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (Exception e) {
}
System.out.println(2);
}
}
输出:
1
2
或者
2
1
如果CyclicBarrier的参数为3,那么会一直阻塞,Because none of the three threads arrive.
CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景,如下列代码所示.
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest2 {
static CyclicBarrier c = new CyclicBarrier(2, new A());
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (Exception e) {
}
System.out.println(2);
}
static class A implements Runnable {
@Override
public void run() {
System.out.println(3);
}
}
}
输出:
3
1
2
CyclicBarrier和CountDownLatch的区别
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置
CyclicBarrier能处理更为复杂的业务场景.例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次.
CyclicBarrier还提供其他有用的方法
比如getNumberWaiting方法可以获得Cyclic-Barrier阻塞的线程数量.isBroken()方法用来了解阻塞的线程是否被中断.
控制并发线程数的Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源.
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorServicethreadPool = Executors
.newFixedThreadPool(THREAD_COUNT);
//只有10个数据库连接
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (inti = 0; i< THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("save data");
s.release();
} catch (InterruptedException e) {
}
}
});
}
threadPool.shutdown();
}
}
Semaphore的用法也很简单,首先线程使用Semaphore的acquire方法获取一个许可证,使用完之后调用release()方法归还许可证.还可以用tryAcquire方法尝试获取许可证.
线程间交换数据的Exchanger
Exchanger(交换者)是一个用于线程间协作的工具类.Exchanger用于进行线程间的数据交换.
- 它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据.这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产 出来的数据传递给对方.
Exchanger的应用场景:
- Exchanger可以用于遗传算法:遗传算法里需要选出两个人作为交配对象,这时候会交换 两人的数据,并使用交叉规则得出2个交配结果
- Exchanger也可以用于校对工作:比如我们需要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否录入一致
public class ExchangerTest {
private static final Exchanger<String>exgr = new Exchanger<String>();
private static ExecutorServicethreadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String A = "银行流水A"; // A录入银行流水数据
exgr.exchange(A);
} catch (InterruptedException e) {
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String B = "银行流水B"; // B录入银行流水数据
String A = exgr.exchange("B");
System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:"
+ A + ",B录入是:" + B);
} catch (InterruptedException e) {
}
}
});
threadPool.shutdown();
}
}
如果两个线程有一个没有执行exchange方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)设置最大等待时长.
参考资料:《Java并发编程的艺术》
边栏推荐
猜你喜欢

Why are negative numbers in binary represented in two's complement form - binary addition and subtraction

leetcode每天5题-Day12

法定代表人和股东是什么关系

线程(中):线程安全

一篇文章带你搞懂什么是幂等性问题?如何解决幂等性问题?

Hezhou ESP32C3 +1.8"tft network clock under Arduino framework

Matlab simulation of multi-factor house price prediction based on BP neural network

Shell编程三剑客之awk

【无标题】

西门子Step7和TIA软件“交叉引用”的使用
随机推荐
【心理学·人物】第二期(学术X综艺)
leetcode每天5题-Day10
OAuth2的使用场景、常见误区、使用案例
awk of the Three Musketeers of Shell Programming
LeetCode 301. Remove Invalid Parentheses BFS
LeetCode·1413.逐步求和得到正数的最小值·贪心
Promise原理及实现
From entry to mastery of PHPCMS imitation station, Xiaobai is enough to watch this set of courses
【LeetCode】41. The first missing positive number
2022 T Elevator Repair Exam Questions and Mock Exams
Matlab simulation of multi-factor house price prediction based on BP neural network
Depth of carding: prevent model fitting method
成为黑客不得不学的语言,看完觉得你们还可吗?
什么是SRM?有什么作用?在企业管理中能实现哪些功能?
深度学习之-01
Hezhou ESP32C3 +1.8"tft network clock under Arduino framework
Consulting cdc 2.0 for mysql does not execute flush with read lock. How to ensure bin
ECMAScript6 Proxy和Reflect 对象操作拦截以及自定义
The time for flinkcdc to read pgsql is enlarged. Does anyone know what happened? gmt_create':1
元宇宙 | 你能通过图灵测试吗?