当前位置:网站首页>并发工具类——CountDownLatch、CyclicBarrier、Semaphore、Exchanger的介绍与使用
并发工具类——CountDownLatch、CyclicBarrier、Semaphore、Exchanger的介绍与使用
2022-08-10 05:05:00 【热爱编程的小宇】
CountDownLatch、CyclicBarrier、Semaphore、Exchanger
等待多线程完成的CountDownLatch
CountDownLatch类似于join,就是使当前线程等待其它线程结束。
join的实现原理,不停的检查线程是否存活。
while (isAlive()) {
wait(0);
}
CountDownLatch 与 join 的区别:
CountDownLatch可以让等待的线程只等待工作线程某一步骤结束即可,不需要让工作线程全部执行完毕。
实例:
public class CountDownLatchTest {
staticCountDownLatch c = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(1);
c.countDown();
System.out.println(2);
c.countDown();
}
}).start();
c.await();
System.out.println("3");
}
}
CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。
当我们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零。由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,只需要把这个 CountDownLatch的引用传递到线程里即可。
同步屏障CyclicBarrier
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。
作用是让一 组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。(游戏匹配)
CyclicBarrier简介
CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。
实例:
public class CyclicBarrierTest {
staticCyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (Exception e) {
}
System.out.println(2);
}
}
输出:
1
2
或者
2
1
如果CyclicBarrier的参数为3,那么会一直阻塞,因为没有三个线程到达。
CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景,如下列代码所示。
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest2 {
static CyclicBarrier c = new CyclicBarrier(2, new A());
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
}
System.out.println(1);
}
}).start();
try {
c.await();
} catch (Exception e) {
}
System.out.println(2);
}
static class A implements Runnable {
@Override
public void run() {
System.out.println(3);
}
}
}
输出:
3
1
2
CyclicBarrier和CountDownLatch的区别
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置
CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。
CyclicBarrier还提供其他有用的方法
比如getNumberWaiting方法可以获得Cyclic-Barrier阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断。
控制并发线程数的Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorServicethreadPool = Executors
.newFixedThreadPool(THREAD_COUNT);
//只有10个数据库连接
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (inti = 0; i< THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("save data");
s.release();
} catch (InterruptedException e) {
}
}
});
}
threadPool.shutdown();
}
}
Semaphore的用法也很简单,首先线程使用Semaphore的acquire方法获取一个许可证,使用完之后调用release()方法归还许可证。还可以用tryAcquire方法尝试获取许可证。
线程间交换数据的Exchanger
Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。
- 它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产 出来的数据传递给对方。
Exchanger的应用场景:
- Exchanger可以用于遗传算法:遗传算法里需要选出两个人作为交配对象,这时候会交换 两人的数据,并使用交叉规则得出2个交配结果
- Exchanger也可以用于校对工作:比如我们需要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否录入一致
public class ExchangerTest {
private static final Exchanger<String>exgr = new Exchanger<String>();
private static ExecutorServicethreadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String A = "银行流水A"; // A录入银行流水数据
exgr.exchange(A);
} catch (InterruptedException e) {
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
String B = "银行流水B"; // B录入银行流水数据
String A = exgr.exchange("B");
System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:"
+ A + ",B录入是:" + B);
} catch (InterruptedException e) {
}
}
});
threadPool.shutdown();
}
}
如果两个线程有一个没有执行exchange方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)设置最大等待时长。
参考资料:《Java并发编程的艺术》
边栏推荐
- Rpc接口压测
- 【裴蜀定理】CF1055C Lucky Days
- ORA-16018 异常处理记录
- flinksql怎么写redis的value只有最后一个字段?
- 咨询cdc 2.0 for mysql不执行flush with read lock.怎么保证bin
- 在vscode中屏蔽Alt热键
- 2022山东省安全员C证考试题及模拟考试
- FPGA工程师面试试题集锦11~20
- Ask you guys.The FlinkCDC2.2.0 version in the CDC community has a description of the supported sqlserver version, please
- openvino 安装(01)
猜你喜欢
随机推荐
Guys, is it normal that the oracle archive log grows by 3G in 20 minutes after running cdc?
抽象问题方法论
pytorch 学习
小影科技IPO被终止:年营收3.85亿 五岳与达晨是股东
2022 security officer C certificate test and simulation test in shandong province
栈与队列 | 有效的括号、删除字符串中的所有相邻元素、逆波兰表达式求值、滑动窗口的最大值、前K个高频元素 | leecode刷题笔记
Shell编程三剑客之awk
aliases节点分析
关于rust的mongodb驱动count方法无法与near条件一同使用的问题
SQL Server查询优化
ORA-16018 异常处理记录
各位大佬,idea中测试使用FlinkCDC SQL 读取Mysql 数据写入Kafka中,代码中创
解决“File has been changed outside the editor, reload?”提示
LeetCode 301. Remove Invalid Parentheses BFS
The time for flinkcdc to read pgsql is enlarged. Does anyone know what happened? gmt_create':1
`id` bigint(20) unsigned NOT NULL COMMENT '数据库主键',
众昂矿业:萤石下游需求强劲
FPGA工程师面试试题集锦21~30
JavsSE => 多态
cmake









