当前位置:网站首页>Concurrency tool class - introduction and use of CountDownLatch, CyclicBarrier, Semaphore, Exchanger
Concurrency tool class - introduction and use of CountDownLatch, CyclicBarrier, Semaphore, Exchanger
2022-08-10 05:09:00 【Xiaoyu who loves programming】
CountDownLatch、CyclicBarrier、Semaphore、Exchanger
等待多线程完成的CountDownLatch
CountDownLatch类似于join,Is to make the current thread wait for other threads to end.
join的实现原理,Constantly check if the thread is alive.
while (isAlive()) {
wait(0);
}
CountDownLatch 与 join 的区别:
CountDownLatchThe waiting thread can be made to only wait for the completion of a certain step of the worker thread,There is no need to let the worker threads all finish executing.
实例:
public class CountDownLatchTest {
staticCountDownLatch c = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(1);
c.countDown();
System.out.println(2);
c.countDown();
}
}).start();
c.await();
System.out.println("3");
}
}
CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N.
当我们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零.由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤.用在多个线程时,只需要把这个 CountDownLatch的引用传递到线程里即可.
同步屏障CyclicBarrier
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier).
The effect is to let a 组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行.(游戏匹配)
CyclicBarrier简介
CyclicBarrier默认的构造方法是CyclicBarrier(int parties),Its parameter represents the barrier拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞.
实例:
public class CyclicBarrierTest {
staticCyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (Exception e) {
}
System.out.println(2);
}
}
输出:
1
2
或者
2
1
如果CyclicBarrier的参数为3,那么会一直阻塞,Because none of the three threads arrive.
CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景,如下列代码所示.
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest2 {
static CyclicBarrier c = new CyclicBarrier(2, new A());
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (Exception e) {
}
System.out.println(2);
}
static class A implements Runnable {
@Override
public void run() {
System.out.println(3);
}
}
}
输出:
3
1
2
CyclicBarrier和CountDownLatch的区别
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置
CyclicBarrier能处理更为复杂的业务场景.例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次.
CyclicBarrier还提供其他有用的方法
比如getNumberWaiting方法可以获得Cyclic-Barrier阻塞的线程数量.isBroken()方法用来了解阻塞的线程是否被中断.
控制并发线程数的Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源.
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorServicethreadPool = Executors
.newFixedThreadPool(THREAD_COUNT);
//只有10个数据库连接
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (inti = 0; i< THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("save data");
s.release();
} catch (InterruptedException e) {
}
}
});
}
threadPool.shutdown();
}
}
Semaphore的用法也很简单,首先线程使用Semaphore的acquire方法获取一个许可证,使用完之后调用release()方法归还许可证.还可以用tryAcquire方法尝试获取许可证.
线程间交换数据的Exchanger
Exchanger(交换者)是一个用于线程间协作的工具类.Exchanger用于进行线程间的数据交换.
- 它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据.这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产 出来的数据传递给对方.
Exchanger的应用场景:
- Exchanger可以用于遗传算法:遗传算法里需要选出两个人作为交配对象,这时候会交换 两人的数据,并使用交叉规则得出2个交配结果
- Exchanger也可以用于校对工作:比如我们需要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否录入一致
public class ExchangerTest {
private static final Exchanger<String>exgr = new Exchanger<String>();
private static ExecutorServicethreadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String A = "银行流水A"; // A录入银行流水数据
exgr.exchange(A);
} catch (InterruptedException e) {
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String B = "银行流水B"; // B录入银行流水数据
String A = exgr.exchange("B");
System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:"
+ A + ",B录入是:" + B);
} catch (InterruptedException e) {
}
}
});
threadPool.shutdown();
}
}
如果两个线程有一个没有执行exchange方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)设置最大等待时长.
参考资料:《Java并发编程的艺术》
边栏推荐
- Joomla漏洞复现
- leetcode每天5题-Day13
- oracle cdc时,设置并行度2插槽数1,最终任务只有一个tm,是不是因为oracle不支持并发
- An article to master the entire JVM, JVM ultra-detailed analysis!!!
- 二进制中负数为何要用补码形式来表示——二进制加减法
- Consulting cdc 2.0 for mysql does not execute flush with read lock. How to ensure bin
- FPGA engineer interview questions collection 11~20
- 【无标题】
- Depth of carding: prevent model fitting method
- 深度学习——循环神经网络RNN 未完待续
猜你喜欢
随机推荐
How to choose the right oscilloscope probe in different scenarios
Ueditor editor arbitrary file upload vulnerability
RadiAnt DICOM Viewer 2022.1 Crack
Joomla漏洞复现
咨询cdc 2.0 for mysql不执行flush with read lock.怎么保证bin
Guys, is it normal that the oracle archive log grows by 3G in 20 minutes after running cdc?
软考考生注意!2022年下半年报名详细流程来了!
最强大脑(1)
SQL数据库字段追加到主表
Shell编程三剑客之awk
FPGA工程师面试试题集锦41~50
FPGA工程师面试试题集锦1~10
2022 R2 transportable pressure vessel filling operation examination question bank simulation platform
FPGA engineer interview questions collection 31~40
深度梳理:防止模型过拟合的方法汇总
如何从代码层提高产品质量
How cursors work in Pulsar
Acwing 59. 把数字翻译成字符串 计数类DP
mysql常用命令有什么
22牛客多校3 A.Ancestor(LCA + 枚举)









