当前位置:网站首页>并发编程第10篇,CountDownLatch(计数器)和Semaphore(信号量)
并发编程第10篇,CountDownLatch(计数器)和Semaphore(信号量)
2022-08-08 23:36:00 【进击的豆子】
Semaphore信号量底层原理
Semaphore用于限制可以访问某些资源(物理或逻辑的)的线程数目,他维护了一个许可证集合,有多少资源需要限制就维护多少许可证集合,假如这里有N个资源,那就对应于N个许可证,同一时刻也只能有N个线程访问。一个线程获取许可证就调用acquire方法,用完了释放资源就调用release方法。
可以简单理解为Semaphore信号量可以实现对接口限流,底层是基于aqs实现
Semaphore简单用法
Semaphore semaphore = new Semaphore(5); for (int i = 0; i < 6; i++) { new Thread(() -> { try { // 获取凭证 状态-1 semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + ",抢到了凭据"); try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + ",释放凭据"); // 状态+1 semaphore.release(); } catch (Exception e) { } }).start(); }
|
Semaphore工作原理
- 可以设置Semaphore信号量的 状态state值为5
- 当一个线程获取到锁的情况下,将state-1,锁释放成功之后state+1;
- 当state<0,表示没锁的资源,则当前线程阻塞。
手写Semaphore信号量
public class MayiktSemaphore { private Sync sync; public MayiktSemaphore(int count) { this.sync = new Sync(count); } public void acquire() { //获取锁 sync.acquireShared(1); } public void release() { sync.releaseShared(1); } class Sync extends AbstractQueuedSynchronizer { public Sync(int count) { setState(count); } @Override protected int tryAcquireShared(int arg) { for (; ; ) { int oldState = getState(); int newState = oldState - arg; // -1 if (newState < 0 || compareAndSetState(oldState, newState)) { return newState; } } } @Override protected boolean tryReleaseShared(int arg) { for (; ; ) { int oldState = getState(); int newState = oldState + arg; if (newState < oldState) { throw new Error("Maximum permit count exceeded"); } if (compareAndSetState(oldState, newState)) { return true; } } } } public static void main(String[] args) { MayiktSemaphore mayiktSemaphore = new MayiktSemaphore(5); for (int i = 0; i <= 6; i++) { new Thread(() -> { mayiktSemaphore.acquire(); System.out.println(Thread.currentThread().getName() + ",抢到了凭据"); // mayiktSemaphore.release(); }).start(); } } }
|
CountDownLatch
CountDownLatch(计数器)、CyclicBarrier(回环屏障)、信号量(Semaphore)
CountDownLatch源码分析
CountDownLatch是一种java.util.concurrent包下一个同步工具类,它允许一个或多个线程等待直到在其他线程中一组操作执行完成。 和join方法非常类似
CountDownLatch底层是基于AQS实现的
CountDownLatch countDownLatch=new CountDownLatch(2) AQS的state状态为2
调用countDownLatch.countDown();方法的时候状态-1 当AQS状态state为0的情况下,则唤醒正在等待的线程。
CountDownLatch与Join的区别
Join底层是基于wait方法实现,而CountDownLatch底层是基于AQS实现。
CountDownLatch简单案例
CountDownLatch countDownLatch = new CountDownLatch(1); new Thread(() -> { try { System.out.println("t1开始执行.."); countDownLatch.await(); System.out.println("t1结束执行.."); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1").start(); countDownLatch.countDown();
|
基于AQS手写CountDownLatch
基于aqs手写CountDownLatch思路:假设状态设置为2,也就是状态为0的时候才可以唤醒正在等待的线程。
public class MayiktCountDownLatch { private Sync sync; private MayiktCountDownLatch(int count) { sync = new Sync(count); } public void await() { sync.acquireShared(1); } public void countDown() { sync.releaseShared(1); } class Sync extends AbstractQueuedSynchronizer { public Sync(int count) { setState(count); } @Override protected int tryAcquireShared(int arg) { return getState() > 0 ? -1 : 1; } @Override protected boolean tryReleaseShared(int arg) { for (; ; ) { int oldState = getState(); int newSate = oldState - arg; if (compareAndSetState(oldState, newSate)) { return true; } } } } public static void main(String[] args) throws InterruptedException { MayiktCountDownLatch mayiktCountDownLatch = new MayiktCountDownLatch(2); new Thread(() -> { System.out.println(Thread.currentThread().getName() + ",1"); mayiktCountDownLatch.await(); System.out.println(Thread.currentThread().getName() + ",2"); }).start(); // Thread.sleep(3000); mayiktCountDownLatch.countDown(); mayiktCountDownLatch.countDown(); ; } }
|
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 如果tryAcquireShared返回小于0的情况下,可以让当前 线程变为阻塞可中断。 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } 所以需要重写方法 /** * 如果状态小于=0的情况下 则不需要阻塞线程,如果>0的情况下,则需要阻塞当前线程 * * @param arg * @return */ @Override protected int tryAcquireShared(int arg) { return getState() <= 0 ? 1 : -1; } public final boolean releaseShared(int arg) { // 如果状态为0的情况下,则调用doReleaseShared唤醒正在等待的线程 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int arg) { for (; ; ) { int state = getState(); //如果 state 已经小于等于0了,说明等待线程已经被唤醒了。就不用这个线程来再次唤醒了 if (state <= 0) return false; int nextS = state - arg; if (compareAndSetState(state, nextS)) { return nextS <= 0;//这时如果小于等于0,就应该去唤醒等待线程了 } } }
|
边栏推荐
猜你喜欢
随机推荐
获取一星期前的时间 n-1 为含当天 一周7天 7-1
最详树莓派4B装机流程及ifconfig不到wlan0的解决办法
Manacher(求解最长回文子串)
Codeforces Round #738 (Div. 2) E
抽象内部类
2022牛客多校六 B-Eezie and Pie (dfs)
MySQL 原理与优化,Group By 优化 技巧
不躺平,然后做到极致,就是最大的“安全感”
获取半年前当月的开始时间及当天结束时间
获取当前一周时间 不含当天
stm32 uses spi1 to read data from dma in slave mode
ViewOverlay与ViewGroupOverlay
MES对接Simba实现展讯平台 IMEI 写号与耦合测试
php convert timestamp to just, minutes ago, hours ago, days ago format
2022牛客多校六 J-Number Game(简单推理)
(2022杭电多校六)1012-Loop(单调栈+思维)
MySQL 高级知识【 MyISAM 读锁写锁限制】
11 Spark on RDD CheckPoint
2022杭电多校六 1009-Map (巴那赫不动点)
时间对象的格式化