当前位置:网站首页>Concurrency tool class - introduction and use of CountDownLatch, CyclicBarrier, Semaphore, Exchanger
Concurrency tool class - introduction and use of CountDownLatch, CyclicBarrier, Semaphore, Exchanger
2022-08-10 05:09:00 【Xiaoyu who loves programming】
CountDownLatch、CyclicBarrier、Semaphore、Exchanger
等待多线程完成的CountDownLatch
CountDownLatch类似于join,Is to make the current thread wait for other threads to end.
join的实现原理,Constantly check if the thread is alive.
while (isAlive()) {
wait(0);
}
CountDownLatch 与 join 的区别:
CountDownLatchThe waiting thread can be made to only wait for the completion of a certain step of the worker thread,There is no need to let the worker threads all finish executing.
实例:
public class CountDownLatchTest {
staticCountDownLatch c = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(1);
c.countDown();
System.out.println(2);
c.countDown();
}
}).start();
c.await();
System.out.println("3");
}
}
CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N.
当我们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零.由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤.用在多个线程时,只需要把这个 CountDownLatch的引用传递到线程里即可.
同步屏障CyclicBarrier
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier).
The effect is to let a 组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行.(游戏匹配)
CyclicBarrier简介
CyclicBarrier默认的构造方法是CyclicBarrier(int parties),Its parameter represents the barrier拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞.
实例:
public class CyclicBarrierTest {
staticCyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (Exception e) {
}
System.out.println(2);
}
}
输出:
1
2
或者
2
1
如果CyclicBarrier的参数为3,那么会一直阻塞,Because none of the three threads arrive.
CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景,如下列代码所示.
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest2 {
static CyclicBarrier c = new CyclicBarrier(2, new A());
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (Exception e) {
}
System.out.println(2);
}
static class A implements Runnable {
@Override
public void run() {
System.out.println(3);
}
}
}
输出:
3
1
2
CyclicBarrier和CountDownLatch的区别
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置
CyclicBarrier能处理更为复杂的业务场景.例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次.
CyclicBarrier还提供其他有用的方法
比如getNumberWaiting方法可以获得Cyclic-Barrier阻塞的线程数量.isBroken()方法用来了解阻塞的线程是否被中断.
控制并发线程数的Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源.
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorServicethreadPool = Executors
.newFixedThreadPool(THREAD_COUNT);
//只有10个数据库连接
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (inti = 0; i< THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("save data");
s.release();
} catch (InterruptedException e) {
}
}
});
}
threadPool.shutdown();
}
}
Semaphore的用法也很简单,首先线程使用Semaphore的acquire方法获取一个许可证,使用完之后调用release()方法归还许可证.还可以用tryAcquire方法尝试获取许可证.
线程间交换数据的Exchanger
Exchanger(交换者)是一个用于线程间协作的工具类.Exchanger用于进行线程间的数据交换.
- 它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据.这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产 出来的数据传递给对方.
Exchanger的应用场景:
- Exchanger可以用于遗传算法:遗传算法里需要选出两个人作为交配对象,这时候会交换 两人的数据,并使用交叉规则得出2个交配结果
- Exchanger也可以用于校对工作:比如我们需要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否录入一致
public class ExchangerTest {
private static final Exchanger<String>exgr = new Exchanger<String>();
private static ExecutorServicethreadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String A = "银行流水A"; // A录入银行流水数据
exgr.exchange(A);
} catch (InterruptedException e) {
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String B = "银行流水B"; // B录入银行流水数据
String A = exgr.exchange("B");
System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:"
+ A + ",B录入是:" + B);
} catch (InterruptedException e) {
}
}
});
threadPool.shutdown();
}
}
如果两个线程有一个没有执行exchange方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)设置最大等待时长.
参考资料:《Java并发编程的艺术》
边栏推荐
- What are the common commands of mysql
- Consulting cdc 2.0 for mysql does not execute flush with read lock. How to ensure bin
- 【LeetCode】41. The first missing positive number
- An article to master the entire JVM, JVM ultra-detailed analysis!!!
- [Web3 Series Development Tutorial - Create Your First NFT (7)] Create an NFT DApp and assign attributes to your NFT, such as pictures
- flinksql怎么写redis的value只有最后一个字段?
- SQL数据库字段追加到主表
- 抽象问题方法论
- mysql cdc (2.1.1)inital snapshot数据库的时候设置了5个并发度,se
- openvino 安装(01)
猜你喜欢
基于 EasyCV 复现 DETR 和 DAB-DETR,Object Query 的正确打开方式
How Current Probes Set Oscilloscope Parameters
【心理学·人物】第二期(学术X综艺)
How cursors work in Pulsar
2022 T Elevator Repair Exam Questions and Mock Exams
EasyGBS connects to mysql database and prompts "can't connect to mysql server", how to solve it?
一篇文章掌握整个JVM,JVM超详细解析!!!
Stacks and Queues | Valid parentheses, delete all adjacent elements in a string, reverse Polish expression evaluation, maximum sliding window, top K high frequency elements | leecode brush questions
How to improve product quality from the code layer
strongest brain (1)
随机推荐
小影科技IPO被终止:年营收3.85亿 五岳与达晨是股东
Kubernetes资源编排系列之一: Pod YAML篇
FPGA engineer interview questions collection 31~40
An article to master the entire JVM, JVM ultra-detailed analysis!!!
Rpc interface stress test
ORA-16018 异常处理记录
基于 EasyCV 复现 DETR 和 DAB-DETR,Object Query 的正确打开方式
网络层与数据链路层
Rpc接口压测
关于rust的mongodb驱动count方法无法与near条件一同使用的问题
SQLSERVER 2008 解析 Json 格式数据
FPGA工程师面试试题集锦11~20
抽象问题方法论
When oracle cdc, set the parallelism to 2 and the number of slots to 1, and the final task has only one tm. Is it because oracle does not support concurrency
请教一下各位大佬。CDC社区中FlinkCDC2.2.0版本有说明支持的sqlserver版本 ,请
Why are negative numbers in binary represented in two's complement form - binary addition and subtraction
The sword refers to Offer 033. Variation array
WAN技术-1广域网接口
FPGA工程师面试试题集锦21~30
LeetCode·124.二叉树中的最大路径和·递归