当前位置:网站首页>JUC Concurrent Programming

JUC Concurrent Programming

2022-08-11 08:11:00 paiidds

1.什么是JUC

JUC是java.util.concurrent的缩写,For multi-threaded concurrent programming,The purpose is to support better high-concurrency tasks.

2.进程和线程概念

何为进程?

进程是程序的一次执行过程,是系统运行程序的基本单位,因此进程是动态的.The system runs a program including the creation of a process,Run and destroy processes.

在Java程序中启动mainThe function essentially starts aJVM进程,在这里插入代码片而mainThe thread where the function is located is a thread of the process,称为主线程.

Can view the task manager to view the running process.
使用ESC+shift+ctrl
在这里插入图片描述

Java程序中默认有几个线程?

答: 两个,分别是main主线程和GC线程.

何为线程?

线程和进程相似,But a thread is a smaller unit of execution than a process.There can often be multiple threads in a process.Equivalent to solve a math problem,Math problem is like a process,The solution steps are like a thread,Through such a solution step by step, finally complete this math problem.

Unlike a process, multiple threads of the same kind will share the resources of the process's heap and method area.,And each thread has a private program counter、虚拟机栈和本地方法栈,So the system will generate a thread,or when switching work between threads,负担要比进程小得多,Threads are therefore also called lightweight processes.

Java程序天生就是多线程程序,通过JMX来看一下一个普通的java程序有哪些线程.

JMX(Java Management Extensions),中文意思是Java管理拓展,用于管理和监视Java程序.The most common is forJVMdetection and management,比如JVM内存,CPU使用率、线程数、Garbage collection, etc.

在这里插入图片描述

通过上面可知:一个JavaThe program is run bymainThread running concurrently with multiple other threads.

线程和进程的关系,区别及优缺点

基于JVM的角度分析,First look at the image below

在这里插入图片描述

线程是进程划分的更小的运行单位,The biggest difference between threads and processes is that each process runs independently,While each thread is not necessarily,Because the threads of the same process are very likely to affect each other.Thread execution overhead is small,但不利于资源的管理和保护,The process is just the opposite.

Java真的可以开启线程吗?

在这里插入图片描述

开不了,Because the underlying principle of the opened thread isC++,Need to use native method,而JavaIt is not possible to directly manipulate local resources.

并行与并发的区别

  • 并发: Two or more jobs are executed in the same time period
  • 并行: Two and two or more operations performed at the same time

为什么要使用多线程呢?

从总体来看:

  • 从计算机底层来说: 线程可以比作是轻量级的进程,是程序执行的最小单位,线程间的切换和调度的成本远远小于进程.另外,多核 CPU 时代意味着多个线程可以同时运行,这减少了线程上下文切换的开销.

  • From the current Internet age: The systems developed now are always at the level of millions or even tens of millions of concurrency.,而多线程并发编程正是开发高并发系统的基础,The use of multi-threaded concurrency mechanism can greatly improve the concurrency and performance of the system.

From the depth of the computer:

  • 单核时代: Multithreading in the single-core era is mainly to improve process utilizationCPU和IO系统的效率.Assuming only one can run Java 进程的情况,当我们请求 IO 的时候,如果 Java 进程中只有一个线程,此线程被 IO 阻塞则整个进程被阻塞.CPU 和 IO It cannot be run at the same time,那么可以简单地说系统整体效率只有 50%.当使用多线程的时候,一个线程被 IO 阻塞,其他线程还可以继续使用 CPU.从而提高了 Java 进程利用系统资源的整体效率.

  • 多核时代: Multi-threading in the multi-core era is to improve the process utilization of multi-coreCPU的能力.:假如我们要计算一个复杂的任务,我们只用一个线程的话,不论系统有几个 CPU 核心,都只会有一个 CPU 核心被利用到.而创建多个线程,这些线程可以被映射到底层多个 CPU 上执行,在任务中的多个线程没有资源竞争的情况下,任务执行的效率会有显著性的提高,约等于(单核时执行时间/CPU 核心数).

使用多线程可能带来什么问题?

The purpose of concurrent programming is to improve the execution efficiency of the program and increase the speed of the program,But concurrent programming doesn't always run fast,And concurrent programming will encounter many problems,比如:内存泄漏,死锁,线程不安全等等.

线程的状态

Look inside the threadState枚举类型

 public enum State {
        /**
         * Thread state for a thread which has not yet started.
         */
        NEW, 新建状态

        /**
         * Thread state for a runnable thread.  A thread in the runnable
         * state is executing in the Java virtual machine but it may
         * be waiting for other resources from the operating system
         * such as processor.
         */
        RUNNABLE, 运行中状态

        /**
         * Thread state for a thread blocked waiting for a monitor lock.
         * A thread in the blocked state is waiting for a monitor lock
         * to enter a synchronized block/method or
         * reenter a synchronized block/method after calling
         * {@link Object#wait() Object.wait}.
         */
        BLOCKED, 阻塞状态

        /**
         * Thread state for a waiting thread.
         * A thread is in the waiting state due to calling one of the
         * following methods:
         * <ul>
         *   <li>{@link Object#wait() Object.wait} with no timeout</li>
         *   <li>{@link #join() Thread.join} with no timeout</li>
         *   <li>{@link LockSupport#park() LockSupport.park}</li>
         * </ul>
         *
         * <p>A thread in the waiting state is waiting for another thread to
         * perform a particular action.
         *
         * For example, a thread that has called <tt>Object.wait()</tt>
         * on an object is waiting for another thread to call
         * <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
         * that object. A thread that has called <tt>Thread.join()</tt>
         * is waiting for a specified thread to terminate.
         */
        WAITING,等待状态

        /**
         * Thread state for a waiting thread with a specified waiting time.
         * A thread is in the timed waiting state due to calling one of
         * the following methods with a specified positive waiting time:
         * <ul>
         *   <li>{@link #sleep Thread.sleep}</li>
         *   <li>{@link Object#wait(long) Object.wait} with timeout</li>
         *   <li>{@link #join(long) Thread.join} with timeout</li>
         *   <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
         *   <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
         * </ul>
         */
        TIMED_WAITING, 限时等待状态

        /**
         * Thread state for a terminated thread.
         * The thread has completed execution.
         */
        TERMINATED; 终止状态
    }

在这里插入图片描述

由上图可以看出:线程创建之后它将处于 NEW(新建) 状态,调用 start() 方法后开始运行,线程这时候处于 READY(可运行) 状态.可运行状态的线程获得了 CPU 时间片(timeslice)后就处于 RUNNING(运行) 状态.

在操作系统中层面线程有 READY 和 RUNNING 状态,而在 JVM 层面只能看到 RUNNABLE 状态(图源:HowToDoInJava:Java Thread Life Cycle and Thread States),所以 Java 系统一般将这两个状态统称为 RUNNABLE(运行中) 状态 .

为什么 JVM 没有区分这两种状态呢?

现在的时分(time-sharing)多任务(multi-task)操作系统架构通常都是用所谓的“时间分片(time quantum or time slice)”方式进行抢占式(preemptive)轮转调度(round-robin式).这个时间分片通常是很小的,一个线程一次最多只能在 CPU 上运行比如 10-20ms 的时间(此时处于 running 状态),也即大概只有 0.01 秒这一量级,时间片用后就要被切换下来放入调度队列的末尾等待再次调度.(也即回到 ready 状态).线程切换的如此之快,区分这两种状态就没什么意义了.

线程执行 wait()方法之后,线程进入 WAITING(等待) 状态.进入等待状态的线程需要依靠其他线程的通知才能够返回到运行状态,而 TIMED_WAITING(超时等待) 状态相当于在等待状态的基础上增加了超时限制,比如通过 sleep(long millis)方法或 wait(long millis)方法可以将 Java 线程置于 TIMED_WAITING 状态.当超时时间到达后 Java 线程将会返回到 RUNNABLE 状态.当线程调用同步方法时,在没有获取到锁的情况下,线程将会进入到 BLOCKED(阻塞) 状态.线程在执行 Runnable 的run()方法之后将会进入到 TERMINATED(终止) 状态

什么是上下文切换?

Threads have their own running conditions and states during execution(称为上下文),For example, the program counter mentioned above,栈信息等. 当出现以下情况,Threads will be taken from possessionCPU状态中退出.

  • 主动让出 CPU,比如调用了 sleep(), wait() 等.
  • 时间片用完,因为操作系统要防止一个线程或者进程长时间占用CPUCause other threads or processes to block.
  • 调用了阻塞类型的系统中断,比如请求 IO,线程被阻塞.
  • 被终止或结束运行

这其中前三种都会发生线程切换,线程切换意味着需要保存当前线程的上下文,留待线程下次占用 CPU 的时候恢复现场.并加载下一个将要占用 CPU 的线程上下文.这就是所谓的 上下文切换.

上下文切换是现代操作系统的基本功能,因其每次需要保存信息恢复信息,这将会占用 CPU,内存等系统资源进行处理,也就意味着效率会有一定损耗,如果频繁切换就会造成整体效率低下.

什么是死锁?如何避免死锁

Deadlock is a situation in which multiple threads enter a blockage in order to compete for a resource,and the resource is insufficient or unreasonably allocated,cause the program to not terminate normally.

如下图所示,线程 A 持有资源 2,线程 B 持有资源 1,他们同时都想申请对方的资源,Therefore, these two threads will wait for each other and enter a deadlock state because they cannot get each other's resources..

在这里插入图片描述

产生死锁的四个条件

  • 互斥条件: The resource is occupied by a thread at any time
  • 请求与保持条件: A thread is blocked by requesting resources,The resources it has held will not be released.
  • 不剥夺条件: 线程已获得的资源在未使用完之前不能被其他线程强行剥夺,You can only release resources after you have finished using them.
  • 循环等待条件: Several threads form a head-to-tail loop waiting for a resource.

破坏死锁的条件

  1. Break Request and Hold Condition: 一次性申请到所有需要的资源
  2. 破坏不剥夺条件: Occupy some resources to further apply for other resources,若申请不到,release the occupied resources.
  3. 破环循环等待条件: 靠按序申请资源来预防.按某一顺序申请资源,释放资源则反序释放.破坏循环等待条件.

如何避免死锁

避免死锁就是在资源分配时,借助于算法(比如银行家算法)Make reasonable estimates of resource allocation,使其进入安全状态.

sleep()方法和wait()方法的区别和共同点?

  1. sleep方法是Thread的方法,而waitMethod A method held by any object.
  2. The main difference between the two is:sleep()方法不会释放锁,而wait()方法会释放锁.
  3. Both can suspend the execution of the thread,但wait()Used in the interaction between the threads/通信,而sleep()The method is mainly used to suspend execution.
  4. waitmethod after use,Require other threads to call the same object to wake up,使用notify()或者notifyall()方法.sleep()After the method is executed,线程会自动苏醒.或者使用wait(long timeout)The thread will also automatically wake up after the timeout.
  5. wait不需要捕获异常,而sleep需要捕获异常.

为什么我们要调用start()方法会执行run()方法,而不能直接调用run()方法?

调用start()Methods may start a thread and make it into the ready state,Just get itcpuExecution can enter the running state,At this time to work for multithreading.而直接调用run()方法,实质上是调用main线程的一个普通方法,Not a multi-threaded job.

3. Lock锁

传统synchronized

/** * 真正的多线程开发,公司中的开发,降低耦合性 * 线程就是一个单独的资源类,没有任何附属的操作! * 1、 属性、方法 */

package com.liang;

public class SaleTicketDemo1 {
    

    public static void main(String[] args) {
    
        //After starting the main thread,开启两个线程,多线程操作一个资源类
        final Ticket ticket = new Ticket();

// new Thread(()->{
    
// for (int i = 1; i < 40; i++) {
    
// ticket.sale();
// }
// },"A").start();
//
// new Thread(()->{
    
// for (int i = 1; i < 40; i++) {
    
// ticket.sale();
// }
// },"B").start();

        //使用更简洁lambada表达式,清晰易懂
        new Thread(()-> {
    for (int i = 1; i < 40; i++) ticket.sale();
        },"A").start();
        new Thread(()-> {
    
            for (int i = 1; i < 40; i++) ticket.sale();
        },"B").start();
        new Thread(()->{
    for(int i =0 ;i<40 ;i++) ticket.sale();
        },"B").start();
    }


}


//Create a ticket class,Has a ticketing function

class Ticket{
    

    //票数,Member variables for each object
    private int ticket = 30;

    //synchronizedGuaranteed code synchronization issues
    public synchronized void sale()
    {
    
        if(ticket>0){
    
            System.out.println(Thread.currentThread().getName()+"取得了第"+(ticket--)+"票");
        }
    }
}



Lock接口

在这里插入图片描述

Lock位于JUC下的接口类,实现类有三种,可重入锁,读锁,写锁,默认为非公平锁,但可以设置为公平锁.

公平锁: 按照先来后到的顺序,依次执行线程,Queuing is not allowed.
非公平锁: Queuing is allowed,Threads that arrive first may execute later,显得不公平.

使用


package com.liang;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class SaleTicketDemo2 {
    
    //获取资源对象

    public static void main(String[] args) {
    
        Ticket2 ticket  = new Ticket2();

        //创建线程
        new Thread(()->{
    for(int i =1 ;i<40;i++) ticket.sale();},"A").start();
        new Thread(()->{
    for(int i =1 ;i<40;i++) ticket.sale();},"B").start();
        new Thread(()->{
    for(int i =1 ;i<40;i++) ticket.sale();},"C").start();
    }

}


//资源对象,使用锁机制Lock
class Ticket2{
    

    private int number = 30;

    //创建锁
    Lock lock = new ReentrantLock(); //可重入锁
    //new ReentrantReadWriteLock(); 读写锁


    //Operate the normal method,It is locked inside
    public  void sale()
    {
    
        try{
    
            //加锁
            lock.lock();
            if(number>0){
    
                System.out.println(Thread.currentThread().getName()+"取到了第"+(number--)+"票");
            }
        }catch (Exception e){
    
            e.printStackTrace();
        }finally {
    
            //使用完lockObject to be released
            lock.unlock();
        }
    }

}

Sychronized和Lock的区别

1.Sychronized是内置的Java关键字,Lock是Java的一个接口.
2.SychronizedNo need to manually release the lock,而LockRequires manual release of the lock.
3.SychronizedUnable to determine the dynamics of acquiring locks,而Lock可以判断是否获取到了锁.
4.SychronizedUnacquired locks will keep waiting to acquire locks,而Lock会使用tryLock()method to try to acquire the lock.
5.Sychronzied是可重入锁,不可以中断,非公平.LockAlso a sync lock,Can read locks,默认为非公平(可以自己设置).
5. SychronizedSuitable for code synchronization problems with a small number of locks,而LockSuitable for lock-heavy code synchronization problems(因为Sychronizedconsumes more system resources).

Check computer core count by code


//Auditor 核数
public class Auditor {
    

    public static void main(String[] args) {
    
        //Get computer core count
        System.out.println(Runtime.getRuntime().availableProcessors());
    }

}

4.生产者和消费者

面试常考题: 单例模式,排序算法,生产者和消费者,死锁

生产者和消费者Synchronized

/** * * 线程之间的通信: 生产者和消费者问题! 等待唤醒,通知唤醒 * 线程交替运行,Controlled by the same variable */

public class PCTest {
    

    public static void main(String[] args) {
    
        //1.Create the resource object to be manipulated
        Data data = new Data();

        //2.创建两个线程
        new Thread(()->{
    
            try {
    
                for (int i = 0; i < 10; i++) {
    
                    data.add();
                }
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
        },"A").start();

        new Thread(()->{
    
            try {
    
                for (int i = 0; i < 10; i++) {
    
                    data.remove();
                }
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
        },"B").start();


    }
}

class Data{
       //资源类
    private int number =0;

    //Synchronize production operations
    public synchronized void add() throws InterruptedException {
    
        //Resource is not empty,则进入等待
        if(number!=0){
    
            this.wait();
        }
        System.out.println(Thread.currentThread().getName()+"=>"+(number++));
        //Wake up the consumer process at this time
        this.notify();
    }
    //Synchronous consumption operations
    public synchronized  void remove() throws InterruptedException {
    
        if(number ==0){
    
            this.wait(); //Production is required if the resource is empty,Free up executive power
        }
        System.out.println(Thread.currentThread().getName()+"=>"+(number--));
        //唤醒生产线程
        this.notify();
    }
}


Two threads can meet the above requirements,实现线程之间的通信,一个负责消费,一个负责生产,Only one of the two can run,Threads that can wake up each other.

问题存在

When producer and consumer problem,使用4个线程启动时,There will be a state of data confusion,The above code does not meet the expected requirements.

在这里插入图片描述
虚假唤醒就是在多线程执行过程中,线程间的通信未按照我们幻想的顺序唤醒,Therefore, there will be data inconsistencies and other results that do not meet our expectations..比如 我的想法是:加1和减1交替执行,他却出现了2甚至3这种数.

为什么会出现虚假唤醒?

在这里插入图片描述

Data different from expected results during false wakeup,由于我们使用的if判断语句,The first time the thread enters the waiting state, it has already been executed,So when the thread wakes up again,就会跳出if方法,thereby exceeding1的数据.False wake-up exists inappropriate wake-up,won't wake up as we think.

False wakeups resolved

将if判断为while判断

解释:
while是为了再一次循环判断刚刚争抢到锁的线程是否满足继续执行下去的条件,条件通过才可以继续执行下去,不通过的线程只能再次进入wait状态,by others alive、就绪状态的线程进行争抢锁.

JUC版本的生产者和消费者问题(基于Lock)

在这里插入图片描述

在这里插入图片描述

JUCPassed when the thread under waits and the thread wakes upCondition实现类来实现的,Compared with the method that contains thread waiting and thread wakeup inside the object,JUCThread wait and thread wake oriented interface.

实现

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/** * JUCThe producer and consumer problem under * 基于Lock实现,其中使用到Condition条件接口 * JUCPassed when the thread under waits and the thread wakes upCondition来实现的 */
public class PCTest1 {
    

    public static void main(String[] args) {
    

        //Get the thread resource class
        final Data2 data2 = new Data2();

        new Thread(()->{
    for (int i = 0; i < 10; i++)data2.add();},"A").start();
        new Thread(()->{
    for (int i = 0; i < 10; i++)data2.remove();},"B").start();
        new Thread(()->{
    for (int i = 0; i < 10; i++)data2.add();},"C").start();
        new Thread(()->{
    for (int i = 0; i < 10; i++)data2.remove();},"D").start();
    }
}


class Data2{
    

    private int number = 0;

    Lock lock = new ReentrantLock();  //可重入锁

    //获取条件
    Condition condition = lock.newCondition();


    //Common method internal lock

    public void add()
    {
    
        try{
    

            //加锁
            lock.lock();
            while(number!=0){
    
                condition.await();  //还有资源,调用condition的等待方法
            }
            System.out.println(Thread.currentThread().getName()+"=>"+(++number));
            //wake up another thread here
            condition.signalAll();
        }catch (Exception e){
    

        }finally {
    
            lock.unlock();
        }
    }

    public void remove()
    {
    
        try{
    

            //加锁
            lock.lock();
            while(number==0){
    
                condition.await();  //没有资源,调用condition的等待方法
            }
            System.out.println(Thread.currentThread().getName()+"=>"+(--number));
            //wake up another thread here
            condition.signalAll();
        }catch (Exception e){
    

        }finally {
    
            lock.unlock();
        }
    }
}

signal和signalAll方法的区别
void signal() :Wakes up one waiting thread. 唤醒在等待的单个线程
void signalAll():Wakes up all waiting threads. 唤醒等待的所有线程

A single process of awakening might not meet conditions,会继续等待.

ConditionPrecise notifications and wake-up threads

上面输出的结果,ConditionNot executed in the order we want,可以通过ConditionPrecise notifications and wake-up threads

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


//ConditionAccurate inform awakened threads
public class PCCondition {
    

    public static void main(String[] args) {
    
        Data3 data3 = new Data3();

        new Thread(()->{
    for (int i = 0; i < 10; i++)data3.a();},"A").start();
        new Thread(()->{
    for (int i = 0; i < 10; i++)data3.b();},"B").start();
        new Thread(()->{
    for (int i = 0; i < 10; i++)data3.c();},"C").start();
    }

}



class Data3{
    

    private int number = 1;

    Lock lock = new ReentrantLock();  //可重入锁

    //获取条件,Three conditional classes are required
    Condition condition1 = lock.newCondition();
    Condition condition2 = lock.newCondition();
    Condition condition3 = lock.newCondition();


    //Common method internal lock

    public void a()
    {
    
        //加锁
        lock.lock();
        try{
    
            while(number!=1){
    
                condition1.await();  //还有资源,调用condition的等待方法
            }
            System.out.println(Thread.currentThread().getName()+"=>"+number);
            number =2;
            //wake up another thread here
            condition2.signal();
        }catch (Exception e){
    
            e.printStackTrace();
        }finally {
    
            lock.unlock();
        }
    }

    public void b()
    {
    
        //加锁
        lock.lock();
        try{
    
            while(number!=2){
    
                condition2.await();
            }
            System.out.println(Thread.currentThread().getName()+"=>"+number);
            //wake up another thread here
            number =3;
            condition3.signal();
        }catch (Exception e){
    
            e.printStackTrace();
        }finally {
    
            lock.unlock();
        }
    }

    public void c()
    {
    
        //加锁
        lock.lock();
        try{
    
            while(number!=3){
    
                condition3.await();  //没有资源,调用condition的等待方法
            }
            System.out.println(Thread.currentThread().getName()+"=>"+(number));
            //wake up another thread here
            number =1;
            condition1.signal();
        }catch (Exception e){
    
            e.printStackTrace();
        }finally {
    
            lock.unlock();
        }
    }


}

注意: 在其他线程通过Conditionto indicate which thread is about to wake up

5.The phenomenon of eight locks

如何判断锁的是谁?

Deep understanding of locking

Eight locked is actually eight questions,The eight questions are as follows:
1.标准情况下,Two threads print first or call?

在这里插入图片描述

2.SMS delay4秒,Two threads first send text messages or phone?

在这里插入图片描述

At this time, the locks are all the same resource object

3.增加了一个普通方法后,先执行发短信还是Hello普通方法?(Ordinary methods do not need to pay attention to locks)

在这里插入图片描述
4.两个对象,两个同步方法,At this point to send text messages or phone(There is a delay in sending text messages4s)

在这里插入图片描述
The lock objects are two different resource objects,So whoever executes first will execute.

5.增加两个静态同步方法,只有一个对象,先发短信还是打电话?

在这里插入图片描述
6.两个对象!增加两个静态的同步方法,先发短信还是打电话?

在这里插入图片描述
两个对象的Class模板只有一个,staticThe static method is loadedClass模板.

7.1个静态的同步方法,1个普通的同步方法,一个对象,先发短信还是打电话

在这里插入图片描述
Two different lock object,一个是class模板,One is the current resource object

8…1个静态的同步方法,1个普通的同步方法,两个对象,先发短信还是打电话

在这里插入图片描述

小结
new this A specific mobile phone object
static Class 唯一的一个模板

6.Insecurity of Collection Classes and Measures

List不安全

List的实现类ArrayList是线程不安全的,Modify error when concurrency occurs in multithreading

在这里插入图片描述

List为何是线程不安全的

ArrayList的add()is there no way to synchronize,因此是线程不安全的.

List不安全解决方案

  1. 将ArrayList替换为Vector
    在这里插入图片描述
    vector是使用了synchronized关键字来保证同步.

  2. 使用CollectionsUtilities are converted to synchronouslist

  3. 使用CopyOnWriteArrayList

在这里插入图片描述

CopyOnWrite写入时复制 COWIt is an optimization strategy of computer design strategy.

多个线程调用的时候,list,读取的时候是固定的,写入(覆盖),When written to avoid coverage,造成数据问题,读写分离.

相比于Vector,CopyOnWriteArrayList的优势在哪?

Vector和CopyOnWriteArrayList都是线程安全的List,底层都是数组实现的,Vector的每个方法都进行了加锁,而CopyOnWriteArrayList的读操作是不加锁的,因此CopyOnWriteArrayList的读性能远高于Vector,Vector每次扩容的大小都是原来数组大小的2倍,而CopyOnWriteArrayList不需要扩容,通过COWThe idea can make the array capacity meet the requirements.

在资源竞争不激烈的情形下,使用Lock性能稍微比synchronized差点点.但是当同步非常激烈的时候,synchronized的性能一下子能下降好几十倍.

set

Modify the error when multithreading also occurs concurrently,Because it is a common method,Do not implement the synchronization of data.
在这里插入图片描述

解决方案:

1.将HashSet通过使用Collections工具类,convert it to synchronousSet
2.使用CopyOnWriteArraySet

HashSet的底层原理就是map,key是无法重复的

Map不安全

HashMap是线程不安全的,The bottom layer does not add any synchronization measures,Also modify the error when multi-threading will occur concurrently.

在这里插入图片描述

在这里插入图片描述
解决方案:

使用线程安全的Map类,例如ConcurrentHashMap

在这里插入图片描述

ConcurrentHash类的put方法中使用了synchronized来保证同步.

在这里插入图片描述

7.Callable(可调用的)

Callable是一个函数式接口,用于创建线程.返回结果并可能引发异常的任务.Implementers only need to implementCallable接口,并重写它的call即可,和Runable相似.

RunableNo return value and no checked exceptions are thrown.

Executors包含的实例方法,Convert from other ordinary forms toCallable接口类.

相比于Runable接口,Callable接口

  1. 可以有返回值
  2. 可以抛出异常
  3. 方法不同,call()

Java创建线程的三种方式

  1. 继承Thread类
  2. 实现Runable接口
  3. 实现Callable接口和Future类

在这里插入图片描述

在这里插入图片描述

具体实现

public class CallableTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyThread myThread =new MyThread(); //Create a thread class written by yourself
        //FutureTask是JDK并发包为Future接口提供的一个实现,代表一个支持取消操作(cancel)的异步计算任务.
        FutureTask futureTask = new FutureTask(myThread);  //适配器类
        new Thread(futureTask,"A").start();
        new Thread(futureTask,"B").start();
        Integer o = (Integer) futureTask.get();  //getmethod may be blocked,得到计算的结果
        System.out.println(o);
    }
}

class MyThread implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        System.out.println("进入call方法");
        return 1024;  //可以有返回值
    }
}

在这里插入图片描述
FutureTask有三种执行状态.

未启动:在FutureTask.run()还没执行之前,FutureTask处于未启动状态.当创建一个FutureTask对象,并且run()方法未执行之前,FutureTask处于未启动状态.
2、已启动:FutureTask对象的run方法启动并执行的过程中,FutureTask处于已启动状态.
3、已完成:FutureTask正常执行结束,或者FutureTask执行被取消(FutureTask对象cancel方法),或者FutureTask对象run方法执行抛出异常而导致中断而结束,FutureTask都处于已完成状态.

FutureTask的get和cancel的执行示意图如下:在这里插入图片描述

细节:

1.有缓存
2.结果可能需要等待,会堵塞.

8.CountDownLatch、CyclicBarrier、Semaphore

CountDownLatch

在这里插入图片描述

实现

public class CountDownLatchDemo {
    

    public static void main(String[] args) {
    
        //减法计算器(The parameter is the number of threads)
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 1; i <=6; i++) {
    
            new Thread(()->{
    
                System.out.println(Thread.currentThread().getName()+"go out");
                countDownLatch.countDown(); //数量减1
            },String.valueOf(i)).start();
        }

        //until the calculator is 0,才向下执行
        try {
    
            countDownLatch.await();
            System.out.println("close door");
        } catch (InterruptedException e) {
    
            e.printStackTrace();
        }
    }
}

原理:

  • countDownLatch.countDown(); // 数量-1
  • countDownLatch.await(); // 等待计数器归零,然后再向下执行
    每次有线程调用 countDown() 数量-1,假设计数器变为0,countDownLatch.await() 就会被唤醒,继续
    执行!

CyclicBarrier(加法计算器)

在这里插入图片描述

public static void main(String[] args) {
    
        /** * 集齐7颗龙珠召唤神龙 */
        // 召唤龙珠的线程
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
    
            System.out.println("召唤神龙成功");
        });

        for (int i = 1; i <= 7; i++) {
    
            final int temp = i;
            new Thread(()->{
    
                System.out.println(Thread.currentThread().getName()+"集结了"+temp+"个龙珠");
                try {
    
                    cyclicBarrier.await(); //After the assembly is completed, enter the waiting state
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
    
                    e.printStackTrace();
                }
            }).start();
        }


    }

在这里插入图片描述

Semaphore(信号量)

在这里插入图片描述

实现

在这里插入图片描述

原理

  • semaphore.acquire() 获得,Suppose if the maximum number of threads is full,等待,等待被释放为止!

  • semaphore.release(); 释放,会将当前的信号量释放 + 1,然后唤醒等待的线程!

作用:多个共享资源互斥的使用! 并发限流,控制最大的线程数!

9.读写锁(ReadWriteLock)

ReentrantReadWriteLock是ReadWriteLock的一个实现类,ReadWriteLockmaintainer pairlocks,一个用于读操作,一个用于写操作.

When acquiring a read lock, multiple threads read at the same time when reading,When acquiring the write lock is only one thread to write.

When a thread successfully acquires a read lock,Will get the content that was updated when the last write lock was made.

Read-write locks allow access to shared data with higher concurrency than mutex locks allow.

在这里插入图片描述

演示

不使用读写锁,At this point, there will be multiple threads writing at the same time,不符合实际.

public class ReadWriteTest {
    

    public static void main(String[] args) {
    
        MyCache myCache = new MyCache();

        //创建多个线程
        for (int i = 1; i <=5; i++) {
    
            final int temp =i;  //保证temp变量的不可变
            new Thread(()->{
    
                myCache.put(temp+"",temp);
            },String.valueOf(i)).start();
        }

        for (int i = 1; i <= 5; i++) {
    
            final  int temp =i;
            new Thread(()->{
    
                myCache.get(temp+"");
            },String.valueOf(i)).start();
        }
    }

}




//Unlocked custom cache,When writing, there can be multiple threads writing
class MyCache{
    
    //添加volatile保证可见性
    private volatile Map<String,Object> map = new HashMap<>();

    //写入
    public void put(String key,Object value){
    
        System.out.println(Thread.currentThread().getName()+"写入值"+key);
        map.put(key, value);
        System.out.println(Thread.currentThread().getName()+"写入成功!");
    }

    //读取
    public void get(String key){
    
        System.out.println(Thread.currentThread().getName()+"读取"+key);
        Object o = map.get(key);
        System.out.println(Thread.currentThread().getName()+"读取成功!值为:"+o);
    }

}

在这里插入图片描述

使用了读写锁,Only threads write at this point,other threads are reading.


public class ReadWriteTest {

    public static void main(String[] args) {
        MyLockCache myCache = new MyLockCache();

        //创建多个线程
        for (int i = 1; i <=5; i++) {
            final int temp =i;  //保证temp变量的不可变
            new Thread(()->{
                myCache.put(temp+"",temp);
            },String.valueOf(i)).start();
        }

        for (int i = 1; i <= 5; i++) {
            final  int temp =i;
            new Thread(()->{
                myCache.get(temp+"");
            },String.valueOf(i)).start();
        }
    }

}



class MyLockCache{


    //使用volatileFor guaranteed visibility
    private volatile Map<String,Object> map = new HashMap<>();

    //使用读写锁,更加细粒度的控制
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    //写入
    public void put(String key,Object value){
        //获取写锁
        readWriteLock.writeLock().lock();
        try{
            System.out.println(Thread.currentThread().getName()+"写入"+key);
            map.put(key, value);
            System.out.println(Thread.currentThread().getName()+"写入成功");
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            readWriteLock.writeLock().unlock();
        }
    }

    //读取

    public void get(String key){
        //获取读锁
        readWriteLock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName()+"读取"+key);
            Object o = map.get(key);
            System.out.println(Thread.currentThread().getName()+"读取成功!值为:"+o);
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            readWriteLock.readLock().unlock();
        }

    }
}

Can control only threads are writing,Other threads cannot write.

在这里插入图片描述

10.阻塞队列

The blocking queue is used to store threads that currently do not have the right to execute,队列有大小限制.

写入: 如果队列满了,就必须阻塞等待
取: 如果队列是空的,必须阻塞等待生产.

在这里插入图片描述

A blocking queue is a type of collection
在这里插入图片描述

When to use blocking queues: 多线程并发处理,线程池!

阻塞队列的使用

四组API

方式抛出异常有返回值,不抛出异常阻塞等待超时等待
添加add()offer()put()offer(,)
移除remove()poll()take()poll(.,)
检查队首元素elementpeek()--

1.抛出异常


public class BlockingQueueTest {
    
    public static void main(String[] args) {
    
        //创建阻塞队列,使用ArrayBlockingQueue
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);//The blocking queue capacity is not3

        //Using the following method will throw an exception if the capacity is exceeded
        System.out.println(queue.add("a"));
        System.out.println(queue.add("b"));
        System.out.println(queue.add("c"));
        System.out.println(queue.add("d"));  //抛出队列已满异常

        System.out.println(queue.remove("a"));
        System.out.println(queue.remove("b"));
        System.out.println(queue.remove("c"));

//System.out.println(queue.remove("d"));
    }

// 检查队首元素
        System.out.println(queue.element());

抛出队列已满异常
在这里插入图片描述

2.不抛出异常,有返回值的

      System.out.println(queue.offer("a"));
        System.out.println(queue.offer("b"));
        System.out.println(queue.offer("c"));
        System.out.println(queue.offer("a"));

        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll());

        //检查队首元素
        System.out.println(queue.peek());

不会有返回值,且有返回值false
在这里插入图片描述

3.阻塞,等待


    private static void t3(ArrayBlockingQueue<String> queue) throws InterruptedException {
    
        queue.put("a");
        queue.put("b");
        queue.put("c");
        queue.put("d");

        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
        System.out.println(queue.take());
    }

Exceeds thread capacity,不会抛出异常,会一直等待.

4.阻塞,限时等待


System.out.println(queue.offer("a"));
        System.out.println(queue.offer("b"));
        System.out.println(queue.offer("c"));
        System.out.println(queue.offer("d",2, TimeUnit.SECONDS));

        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll());
        System.out.println(queue.poll(2,TimeUnit.SECONDS));

有时限等待,consistent with the second,只是有时间限制.
在这里插入图片描述

SynchronousQueue同步队列

同步队列,No capacity size,The stored value must be taken out before it can be stored again.

实现

  /** * /** * * 同步队列 * * 和其他的BlockingQueue 不一样, SynchronousQueue 不存储元素 * * put了一个元素,必须从里面先take取出来,否则不能在put进去值! * */
    public static void main(String[] args) {
    
        BlockingQueue<String> queue = new SynchronousQueue<>();

        //存入值
        new Thread(()->{
    
            try {
    
            System.out.println(Thread.currentThread().getName()+"put 1");
            queue.put("1");
            System.out.println(Thread.currentThread().getName()+"put 2");
            queue.put("2");
            System.out.println(Thread.currentThread().getName()+"put 3");
            queue.put("3");
            } catch (InterruptedException e) {
    
            e.printStackTrace();
            }
        },"A").start();

        new Thread(()->{
    
            try {
    
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+"获得"+queue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+"获得"+queue.take());
                TimeUnit.SECONDS.sleep(3);
                System.out.println(Thread.currentThread().getName()+"获得"+queue.take());
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
        },"B").start();
    }

在这里插入图片描述

11.线程池(重点)

Master the three methods,7大参数,4种拒绝策略

池化技术:Prepare some resources in advance,可以拿来使用,But it has to be returned after use.

We all know the creation and destruction of objects,Very high consumption and performance,Therefore, it is necessary to fight for the frequent creation and destruction of objects.,JVM做了优化,Use various pools to manage the creation and destruction of objects,Use thread pools in threads to manage threads,优化资源的使用.

线程池的好处

  1. 降低资源消耗
  2. 提高响应速度
  3. 方便管理

小结: 线程复用、可以控制最大并发数、管理线程.

线程池

在这里插入图片描述

public class ThreadPoolTest {
    

    //Executors工具类的3大方法

    public static void main(String[] args) {
    
        ExecutorService threadPool = Executors.newSingleThreadExecutor(); //创建单个线程池
        ExecutorService threadPool1 = Executors.newFixedThreadPool(4);  //Create a fixed thread pool size
        ExecutorService threadPool2 = Executors.newCachedThreadPool(); //Create a scalable thread pool
        ScheduledExecutorService threadPool3 = Executors.newScheduledThreadPool(2); //Create a scheduled thread pool with a specified number of core threads


        try{
    
            for (int i = 0; i < 100; i++) {
    
                //执行线程池
                threadPool.execute(()->{
    
                    System.out.println(Thread.currentThread().getName()+"ok");
                });
            }
        }catch (Exception e){
    
            e.printStackTrace();
        }finally {
    
            threadPool.shutdown();  //关闭线程池
        }
        
    }

}

7大参数

public static ExecutorService newSingleThreadExecutor() {
    
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
    
return new ThreadPoolExecutor(5, 5,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool() {
    
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
// 本质ThreadPoolExecutor()
public ThreadPoolExecutor(int corePoolSize, // 核心线程池大小
int maximumPoolSize, // 最大核心线程池大小
long keepAliveTime, // 超时了没有人调用就会释放
TimeUnit unit, // 超时单位
BlockingQueue<Runnable> workQueue, // 阻塞队列
ThreadFactory threadFactory, // 线程工厂:创建线程的,一般
不用动
RejectedExecutionHandler handle // 拒绝策略) {
    
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

  • int corePoolSize, // 核心线程池大小
  • int maximumPoolSize, // 最大核心线程池大小
  • long keepAliveTime, // 超时了没有人调用就会释放
  • TimeUnit unit, // 超时单位
  • BlockingQueue workQueue, // 阻塞队列
  • ThreadFactory threadFactory, // 线程工厂:创建线程的,一般不用动
  • RejectedExecutionHandler handle // 拒绝策略

手动创建一个线程池

public class MyThreadPool {
    

    /** * 线程池四种拒绝策略 * AbortPolicy -- When task added to thread pool is rejected,它将抛出RejectedExecutionException异常 * CallerRunsPolicy -- When task adding thread pool is rejected,会在线程池当前正在运行的ThreadThread pool with rejection的任务 * DiscardOldestPolicy --When task added to thread pool is rejected,The thread pool will give up waiting for the oldest unprocessed task in the queue.然后将被拒绝的任务添加到等待队列中. * DiscardPolicy --When task added to thread pool is rejected,The thread pool will discard rejected tasks. * @param args */
    public static void main(String[] args) {
    


        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
                2,  //核心线程数
                5,  //最大线程数
                3,   //活跃时间,
                TimeUnit.SECONDS,   //时间单位
                new LinkedBlockingQueue<>(3), //链表阻塞队列
                Executors.defaultThreadFactory(),  // 默认的线程池工厂
                new ThreadPoolExecutor.AbortPolicy()  //拒绝策略
        );

     try{
    
         for (int i = 0; i < 10; i++) {
    
             threadPool.execute(()->{
    
                 System.out.println(Thread.currentThread().getName()+"ok");
             });
         }
     }catch (Exception e){
    
         e.printStackTrace();
     }finally {
    
         threadPool.shutdown();
     }

    }
}


在这里插入图片描述

四种拒绝策略

  • AbortPolicy – When task added to thread pool is rejected,它将抛出RejectedExecutionException
  • CallerRunsPolicy – When task adding thread pool is rejected,会在线程池当前正在运行的ThreadThread pool with rejection
  • DiscardOldestPolicy --When task added to thread pool is rejected,The thread pool will give up waiting for the oldest unprocessed task in the queue.然后将
  • DiscardPolicy --When task added to thread pool is rejected,The thread pool will discard rejected tasks.

小结与拓展

The size of the number of maximum thread pool how to set up

  • CPU密集型 :The maximum number of threads in the pool isCPU的核数,保持CPU效率最高!
  • IO密集型:The maximum number of threads in the pool is greater than that in the program.IO的线程.

CPU密集型

public class MyThreadPool01 {
    
    //CPU密集型,Define the maximum number of threads in the thread pool as the number of cores
    public static void main(String[] args) {
    
        System.out.println(Runtime.getRuntime().availableProcessors());

        //CPU密集型
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,
                Runtime.getRuntime().availableProcessors(),
                3,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardOldestPolicy());
        try{
    
            for(int i=1 ;i<=9; i++){
    
                threadPool.execute(()->{
    
                    System.out.println(Thread.currentThread().getName()+"ok");
                });
            }
        }catch (Exception e){
    
            e.printStackTrace();
        }finally {
    
            threadPool.shutdown();
        }
    }
}

IOIntensive, you need to judge the cost of the programIO的线程总数,Then the number of threads in the thread pool is greater than.

12.四大函数式接口

新时代的程序员:lambda表达式,链式编程,函数式接口,stream流计算.

函数式接口: 只有一个方法的接口

foreachThe bottom layer of the parameters inside is a consumer functional interface
在这里插入图片描述

Function函数式接口(有传入参数T和返回类型R)

在这里插入图片描述

/** * 函数式接口 */
public class FunctionDemo {
    

    public static void main(String[] args) {
    
        //使用function接口(Need to pass in parameters and specify the return value type)
        Function<String,String> f1 =  (str)->{
    return str;}; //函数式接口和lambdaExpression integration
        System.out.println(f1.apply("liang"));
    }
}

在这里插入图片描述

Predicate断定式接口(有输入参数,Has a return type but is a boolean)

在这里插入图片描述

实践
在这里插入图片描述
Consumer消费者接口(Only input parameters,没有返回值类型)
在这里插入图片描述

实践
在这里插入图片描述
Supplier 供给型接口:只有输出,没有输入

在这里插入图片描述

实践

在这里插入图片描述

13.Stream流式计算

什么是Stream计算

大数据: 存储+计算
集合、MySQL本质就是存储数据的.
The computation is handed over to the stream for implementation.

在这里插入图片描述

实现
在这里插入图片描述

14.分支合并(ForkJoin)

什么是ForkJoin

Fork/Join 框架是 Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架.

在这里插入图片描述

使用的算法

工作窃取算法
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行.

在这里插入图片描述

那么为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如 A 线程负责处理 A 队列里的任务.但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理.干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行.而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行.

工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时.并且消耗了更多的系统资源,比如创建多个线程和多个双端队列.

设计ForkJoin框架

第一步分割任务.首先我们需要有一个 fork 类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小.

第二步执行任务并合并结果.分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行.子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据.

Fork/Join 使用两个类来完成以上两件事情:

ForkJoinTask:我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务.它提供在任务中执行 fork() 和 join() 操作的机制,通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:
RecursiveAction:用于没有返回结果的任务.
RecursiveTask :用于有返回结果的任务.
ForkJoinPool :ForkJoinTask 需要通过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部.当一个工作线程的队列里暂时没有任务时,It randomly steals a task from the tail of other worker threads' queues.

在这里插入图片描述

Fork/Join框架的异常处理

ForkJoinTask 在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以 ForkJoinTask 提供了 isCompletedAbnormally() 方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过 ForkJoinTask 的 getException 方法获取异常.

if(task.isCompletedAbnormally())
{
   System.out.println(task.getException());
}

ForkJoin框架的实现原理

ForkJoinPool是由ForkJoinTask数组和ForkJoinWorkerThread数组组成.

ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务.而ForkJoinWorkerThread数组负责执行这些任务.

在这里插入图片描述

原理探究

ForkJoinTask 的 fork 方法实现原理.当我们调用 ForkJoinTask 的 fork 方法时,程序会调用 ForkJoinWorkerThread in the queue push 方法异步的执行这个任务,然后立即返回结果.代码如下:

在这里插入图片描述

push方法把当前任务存放在 ForkJoinTask 数组 queue 里.然后再调用 ForkJoinPool 的 signalWork() 方法唤醒或创建一个工作线程来执行任务.代码如下:

在这里插入图片描述

ForkJoinTask 的 join 方法实现原理.Join 方法的主要作用是阻塞当前线程并等待获取结果.让我们一起看看 ForkJoinTask 的 join 方法的实现,代码如下:

在这里插入图片描述

首先,它调用了 doJoin() 方法,通过 doJoin() 方法得到当前任务的状态来判断返回什么结果,任务状态有四种:已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出现异常(EXCEPTIONAL).

在这里插入图片描述

  • 如果任务状态是已完成,则直接返回任务结果.
  • 如果任务状态是被取消,则直接抛出 CancellationException.
  • 如果任务状态是抛出异常,则直接抛出对应的异常.

在这里插入图片描述

在 doJoin() 方法里,首先通过查看任务的状态,看任务是否已经执行完了,如果执行完了,则直接返回任务状态,如果没有执行完,则从任务数组里取出任务并执行.如果任务顺利执行完成了,则设置任务状态为 NORMAL,如果出现异常,则纪录异常,并将任务状态设置为 EXCEPTIONAL.

实现
1.创建一个ForkJoin任务的实现类

package com.forkjoin;


import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/** * 如何使用forkjoin * 计算类需要继承ForkJoinTask * ForkJoinTask的实现类:RecursiveActive,RecursiveTask */
public class ForkJoinDemo extends RecursiveTask<Long> {
    

    private Long start;
    private Long end;


    //临界值
    private Long temp = 10000L;

    //构造方法
    public ForkJoinDemo(Long start, Long end) {
    
        this.start = start;
        this.end = end;
    }

    //计算方法
    @Override
    protected Long compute() {
    
        //Between the critical value is divided into two cases
        Long sum = 0L;
        if ((end - start) < temp) {
    
            for (Long i = start; i <=end; i++) {
    
                sum += i;
            }
            return sum;
        } else {
    //使用forkjoin计算
            Long middle = (start + end)/2; //中间值
            ForkJoinTask<Long> task1 = new ForkJoinDemo(start, middle);
            task1.fork();  //将任务放到ForkJoinPool中的ForkJoinin the task array,ForkJoinPoolwake up the thread to execute it.
            ForkJoinTask<Long> task2 = new ForkJoinDemo(middle + 1, end);
            task2.fork();
            return task1.join() + task2.join();  //joinThe method gets the return result of each subtask
        }

    }

}

2.Perform tasks differently

public class ForkJoinTest {
    

    public static void main(String[] args) throws ExecutionException, InterruptedException {
    
        //m1(); // 普通方法
        //m2(); // 使用forkjoin
        m3();    // 使用stream流,特别快

    }

    private static void m3() {
    
        long start = System.currentTimeMillis();
        //stream并行流计算
        long sum = LongStream.rangeClosed(1L, 10_0000_0000L)
                .parallel() //并行计算
                .reduce(0, Long::sum);//Long::sum java新特性,More concise method calls
        long end = System.currentTimeMillis();
        System.out.println("sum = "+sum+",时间: "+(end-start));
    }

    private static void m2() throws InterruptedException, ExecutionException {
    
        Long start = System.currentTimeMillis();
        //创建和使用ForkJoinPool
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        //分割任务
        ForkJoinTask<Long> task =new ForkJoinDemo(1L, 10_0000_0000L);
        //使用线程池提交任务.
        ForkJoinTask<Long> submit = forkJoinPool.submit(task);

        //得到执行的结果
        Long sum = submit.get();

        Long end = System.currentTimeMillis();
        System.out.println("使用forkJoin返回结果:"+sum+"执行结果"+(end-start));
    }


    private static void m1() {
    
        Long sum = 0L;
        //开始时间
        Long start = System.currentTimeMillis();
        //结束时间
        for(Long i= 1L; i< 10_0000_0000L;i++)
        {
    
            sum += i;
        }
        Long end = System.currentTimeMillis();
        System.out.println("The normal method consumes time of:"+(end-start));
    }

}

15.异步回调

Callbacks are divided into synchronous and asynchronous.

同步回调 : Some of our commonly used requests are synchronous callbacks,同步回调是阻塞的,单个的线程需要等待结果的返回才能继续执行.
在这里插入图片描述

异步回调: 有的时候,We don't want the program to block all the time on an execution method,Need to execute subsequent methods first,That's the async callback here.我们在调用一个方法时,如果执行时间比较长,我们可以传入一个回调的方法,当方法执行完时,让被调用者执行给定的回调方法.
在这里插入图片描述

Future设计的初衷:对将来的某个事件的结果进行建模.

在这里插入图片描述

异步执行: CompletableFuture//异步执行: Regardless of success or failure,都会回调.

Asynchronous callback is divided into return value and no return value
runAsync()没有返回值
supplyAsync()有返回值

在这里插入图片描述
whenCompleteAsync的构造参数(Consumer interface variables,异常类变量)

演示

        CompletableFuture<Integer> supplyAsync = CompletableFuture.supplyAsync(() -> {
    
            System.out.println(Thread.currentThread().getName() + "调用supplyAsync()方法,有返回值");
            int i=10/0;
            return 200;
        });
        System.out.println(supplyAsync.whenComplete((t,u)->{
    
            System.out.println("t => "+t);  //获取返回的结果
            System.out.println("u =>"+u);   //获取抛出的异常
            //异常捕获
        }).exceptionally((e)->{
    
            System.out.println(e.getMessage());
            return 404;  //You can get the result returned by the error
        }).get());

在这里插入图片描述

16.JMM

谈谈对volatile的理解

volatile是JavaVirtual machine guarantees a lightweight synchronization mechanism
1.保证可见性
2不保证原子性
3.禁止指令重排

什么是JMM

JMM:Java内存模型,It abstracts the relationship between threads and main memory.

关于JMM的一些同步约定:

  1. 线程解锁前,Must flush shared variables back to memory immediately.
  2. 线程加锁前,Must read the latest value of shared variable in main memory to working memory!
  3. 加锁和解锁是同一把锁

around the thread、工作内存、主内存的8种操作

在这里插入图片描述
加上lock,unlock八种
在这里插入图片描述

JMMThe eight directive provisions used

在这里插入图片描述
问题:The program does not know that the value of the main memory shared variable has been modified

在这里插入图片描述

17. volatile

volatile保证可见性

public class volatileDemo {

    private volatile static int num = 0;
    //visibility 可见性
    //volatile保证可见性, 若不加volatile会进入死循环
    public static void main(String[] args) {

        //Create a secondary thread
        new Thread(()->{
            while (num==0){  //The secondary thread does not know if the primary thread has modified the value of the shared variable
                ;
            }
            System.out.println(Thread.currentThread().getName()+": num ="+num);
        },"A").start();
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        num =1;
        System.out.println(Thread.currentThread().getName()+": num ="+num);

    }
}

volatile不保证原子性

原子性:means that an operation is indivisible,要么都执行,要么都不执行.



    private volatile static int num = 0;
    //visibility 可见性
    //volatile保证可见性, 若不加volatile会进入死循环
    public static void main(String[] args) {
    

// visibility(); //测试volatile可见性
                       //atomicity
        for(int i =1 ;i<=20 ;i++)
        {
    
            new Thread(()->{
    
                for(int j=0;j<1000;j++){
    
                    add();
                }
            }).start();
        }
        while(Thread.activeCount()>2){
    
            Thread.yield();  //More than two threads give way to live threads
        }
        //使用volatileAlso data is lost
        System.out.println(Thread.currentThread().getName()+" "+num);
    }


    public static void add()
    {
    
        num++;
    }

在这里插入图片描述
在这里插入图片描述

使用原子性,解决原子性问题
在这里插入图片描述

public class volatileDemo {
    

    //volatile不能保证原子性,使用原子类Integer
    private volatile static AtomicInteger num = new AtomicInteger();
    //visibility 可见性
    //volatile保证可见性, 若不加volatile会进入死循环
    public static void main(String[] args) {
    

// visibility(); //测试volatile可见性
                       //atomicity
        for(int i =1 ;i<=20 ;i++)
        {
    
            new Thread(()->{
    
                for(int j=0;j<1000;j++){
    
                    add();
                }
            }).start();
        }
        while(Thread.activeCount()>2){
    
            Thread.yield();  //More than two threads give way to live threads
        }
        //使用volatileAlso data is lost
        System.out.println(Thread.currentThread().getName()+" "+num);
    }


    public static void add()
    {
    
// num++;
        num.getAndIncrement();
    }

这些类的底层都直接和操作系统挂钩!在内存中修改值!Unsafe类是一个很特殊的存在!

volatle禁止指令重排序

指令重排序: 就是你写的程序,Computers may not execute in the order you think.

源代码–>编译器优化的重排–>指令并行也可能会重排–>内存系统也会重排–>执行

在这里插入图片描述
在这里插入图片描述

非计算机专业

在这里插入图片描述
Volatile是可以保持可见性,不能保证原子性,由于内存屏障,It can guarantee to avoid the phenomenon of instruction rearrangement.

18.Dive into monotonic mode

饿汉式,DCL懒汉式,深究!

饿汉式

//饿汉单例模式
public class Hungry {
    

    //可能会浪费空间
    private byte[] data1 = new byte[1024];
    private byte[] data2 = new byte[1024];
    private byte[] data3 = new byte[1024];
    private byte[] data4 = new byte[1024];
    
    private Hungry(){
    
        
    }
    
    //Initialize the object as soon as the class is loaded
    private final static Hungry HUNGRY = new Hungry();
    
    public Hungry getInstance(){
    
        return HUNGRY;
    }
}

DCL懒汉模式(双重锁懒汉模式)

package com.singtle;


//DCL懒汉单例模式:Can be destroyed using reflection

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;

public class LazyMan {
    

    //Measures to prevent reflection damage,Called only once for instantiation,Add a pair of boolean values ​​to control,Prevent reflection from creating objects twice
    private static boolean flag = false;

    public LazyMan(){
    
        if(flag == false){
    
            flag =true;  //Mark first initialization variable modification
        }else{
    
            throw new RuntimeException("Don't try to create objects through reflection");
        }
    }

    //双重检测模式
    private volatile static LazyMan lazyMan;

    //懒汉模式,Load the class method to initialize the object
    public LazyMan getInstance(){
    
        if(lazyMan == null){
      //外层if提高效率
            synchronized (lazyMan){
    
                if(lazyMan == null){
      //Inner circulation ensureslazyManNot initialized
                    lazyMan = new LazyMan();
                }
            }
        }
        return lazyMan;
    }

    //反射,But you can still get multiple different instance objects by modifying the boolean value
    public static void main(String[] args) throws NoSuchFieldException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
    
        //Get the properties of a boolean variable
        Field flag = LazyMan.class.getDeclaredField("flag");
        flag.setAccessible(true);  //设置可以访问

        //get initializer
        Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null);//Get the default initializer
        declaredConstructor.setAccessible(true);
        //Get the instance created by the constructor
        LazyMan lazyMan = declaredConstructor.newInstance();

        flag.set(lazyMan,false);  //to modify the objectflag值

        LazyMan lazyMan1 = declaredConstructor.newInstance();

        System.out.println(lazyMan);
        System.out.println(lazyMan1);
    }

}

1.分配内存空间
2.执行构造方法,初始化对象
3.把这个对象指向这个空间

静态内部类

/** * 静态内部类,Pass the instantiation of the object to the inner class */
public class Holder {
    
    public Holder(){
    
        
    }
    
    public Holder getInstance(){
    
        return Inter.HOLDER;
    }
    
    public static class Inter{
    
        public static final Holder HOLDER = new Holder();
    }
}

枚举类型(单例不安全,反射)

    public static void main(String[] args) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
    
        EnumSingle instance1 = EnumSingle.INSTANCE;
        Constructor<EnumSingle> declaredConstructor = EnumSingle.class.getDeclaredConstructor(null);
        declaredConstructor.setAccessible(true);
        EnumSingle instance2 = declaredConstructor.newInstance();
// Is the same object created and instantiated via reflection
        System.out.println(instance1);
        System.out.println(instance2);
    }

在这里插入图片描述

枚举类型的最终反编译源码:

在这里插入图片描述

19.深入理解CAS

CAS,Optimistic locking uses this mechanism.

CAS是一个compareandset:比较并交换

CAS使用

public class CASDemo {
    
    public static void main(String[] args) {
    
        //使用原子类,保证原子性
        AtomicInteger atomicInteger = new AtomicInteger(2020);

        //交换atomicInteger的值
        System.out.println(atomicInteger.compareAndSet(2020, 2021));
        System.out.println(atomicInteger.get());
        atomicInteger.getAndIncrement();  //获取并自增
        System.out.println(atomicInteger.compareAndSet(2020, 2021));
        System.out.println(atomicInteger.get());
    }
}

结果
在这里插入图片描述

底层 Unsafe类
在这里插入图片描述
在这里插入图片描述

CAS:Compare the current job site memory value with the main memory value,,如果这个值是期望的,那么则执行操作!如果不是就一直循环!

缺点

  • 循环耗时
  • 一次性只能保证一个共享变量的原子性
  • ABA问题

CBA问题
在这里插入图片描述
如果我期望的值达到了,那么就更新,否则,就不更新, CAS 是CPU的并发原语.

public class CASDemo2 {
    

    public static void main(String[] args) {
    

        AtomicInteger atomicInteger = new AtomicInteger(2020);

        //捣乱的线程

        System.out.println(atomicInteger.compareAndSet(2020, 2021));
        System.out.println(atomicInteger.get());
        System.out.println(atomicInteger.compareAndSet(2021, 2020));
        System.out.println(atomicInteger.get());

        //期望的线程
        System.out.println(atomicInteger.compareAndSet(2020, 6666));
        System.out.println(atomicInteger.get());
    }
}

在这里插入图片描述

20.原子引用

AtomicInteger、AtomicBoolean、AtomicLong、AtomicReference These atom types,All of them are based on volatile 关键字 +CAS Algorithmic lock-free operation to ensure thread safety of shared data under multi-threaded operations.

  • volatileThe key to ensure the visibility between the threads,When a thread operates- made byvolatile关键字修饰的变量,Changes to this shared variable are immediately visible to other threads.
  • CAS算法,That is, the comparison exchange algorithm,是由UNSAFE提供的,essentially by operatingCPUinstructions to be guaranteed.- -CASAlgorithms provide a fail-fast way,Fail fast when a thread modifies data that has been changed.
  • 当CASWhen an algorithm fails to operate on shared data,Because of the blessing of the spin algorithm,Our updates to shared data are eventually calculated.
    总之,Spinlocks for atomic types+CASThe lock-free operation guarantees the thread safety and atomicity of shared variables

ABA问题

绝大多数情况下,CASAlgorithms are fine,But in operations that need to care about changing values, there will be ABA 的问题,For example, a value turns out to beA,变成了B,后来又变成了A,那么CAS检查时会发现它的值没有发生变化,But in fact it has changed.

如何避免CAS算法带来的ABA问题呢?

Operations for optimistic locking in concurrent situations,We usually increment the version number,For example, the implementation of optimistic locking in the database,This solves the problems caused by concurrent operationsABA问题.在JavaThe realization of the atoms in the package also provides suchAtomicStampedReference.
注意:

Interger对象的复用

IntegerObject caching mechanism,使用valueof会使用缓存,而new一定会创建新的对象分配新的内存空间.

Comparison between identical package types

在这里插入图片描述
实践

public class AtomicReference {
    

    //常规的使用CASThe method performs lock-free self-addition or replaces the header of the stack,出现ABA问题.
    //AtomicStampReference 注意,If it is a wrapper class,Be aware of reference issues between objects.
    //使用AtomicStampReference是为了解决CAS中的ABA问题,
    //CASOnly compares the current value and the memory value for equality,而AtomicStampReferencealso compares references for equality,Then compare whether the values ​​are equal,从而避免ABA问题.
    static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(1,1);  //初始化引用和版本号


    public static void main(String[] args) {
    

        //A线程 :捣乱线程


        new Thread(()->{
    
            int stamp = atomicStampedReference.getStamp();//获取版本号
            System.out.println("stamp = "+stamp);
            //线程睡眠1s
            try {
    
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
            //Use references and version numbers forCAS
            atomicStampedReference.compareAndSet(1, 2, atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1 );
            System.out.println("a2 = "+ atomicStampedReference.getStamp());

            System.out.println(atomicStampedReference.compareAndSet(2,1, atomicStampedReference.getStamp()+1,atomicStampedReference.getStamp()));
            System.out.println("a3 = "+atomicStampedReference.getStamp());
        },"A").start();
        //B线程:预期线程
        new Thread(()->{
    
            int stamp = atomicStampedReference.getStamp();  //获取版本号
            System.out.println("b1 = "+stamp);
            try {
    
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
            System.out.println(atomicStampedReference.compareAndSet(1, 6, stamp, stamp + 1));
            System.out.println(" b2 = "+atomicStampedReference.getStamp());
        },"B").start();
    }
}

21.锁讲解

公平锁和非公平锁

公平锁:非常公平,必须先来后到,依次执行.
非公平锁: 非常不公平,可以插队(默认都是非公平锁)

在这里插入图片描述

可重入锁(递归锁)

  • 可重入锁: 当线程获取某个锁后,还可以继续获取该锁,可以递归调用,而且不会产生死锁.
  • A non-reentrant lock is the opposite of a reentrant lock,After acquiring the lock, it cannot be acquired repeatedly,否则会产生死锁(自己锁自己)

可重入锁

synchronized

/** * 可重入锁 Synchronized */
public class ReentrantLock {
    

    public static void main(String[] args) {
    
        Phone phone = new Phone();
        //Cell Phone Resources

        new Thread(()->{
    phone.send();},"A").start();
        new Thread(()->{
    phone.reception();},"B").start();
    }
}


//资源类

class Phone{
    
    public synchronized void send(){
    
        System.out.println("发送消息");
        reception(); //这里也有锁,Reentrant lock recursive call
    }

    public synchronized void reception(){
    
        System.out.println("接收消息");
    }
}

Lock版

/** * 可重入锁: lock */
public class ReentrantLockDemo2 {
    

    public static void main(String[] args) {
    
        Phone1 phone = new Phone1();
        new Thread(()->{
    phone.send();},"A").start();
        new Thread(()->{
    phone.reception();},"A").start();
    }
}

//资源类

class Phone1{
    
    
    //创建一个可重入锁
    private Lock lock = new ReentrantLock();
    
    public synchronized void send(){
    
        //加锁
        lock.lock();
        //再次加锁
        lock.lock();
        try {
    
            System.out.println("发送消息");
            reception(); //这里也有锁,Reentrant lock recursive call
        }catch (Exception e){
    
            e.printStackTrace();
        }finally {
    
            lock.unlock();  //Locked several times to unlock several times
            lock.unlock(); 
        }
    }

    public synchronized void reception(){
    
       lock.lock();
       try{
    
           System.out.println("接收消息");
       }catch (Exception e){
    
           e.printStackTrace();
       }finally {
    
           lock.unlock();  //解锁
       }
    }
}

不可重入锁

基于notify/wait实现不可重入锁

/** * 不可重入锁 : wait,notify */
public class ReentrantForbiddenLock {
    

    private Thread owner;  //拥有锁的线程,Empty means no one owns

    public synchronized void lock() throws InterruptedException {
    
        Thread thread = Thread.currentThread();  //获取当前线程

        //使用While,可以防止虚假唤醒
        while(owner != null){
    
            System.out.println(String.format("%s等待%s 释放锁",thread.getName(),owner.getName()));
            wait();  //进入阻塞状态
        }
        System.out.println(thread.getName()+"获得了锁");
        owner = thread;
    }


    public synchronized void unlock(){
    
        if(Thread.currentThread() != owner)  //Only the thread holding the lock has the right to release the lock
        {
    
            throw  new IllegalMonitorStateException();
        }
        System.out.println(owner.getName()+"释放了锁");
        owner =null;
        notify();
    }

    public static void main(String[] args) throws InterruptedException {
    
        ReentrantForbiddenLock lock = new ReentrantForbiddenLock();
        lock.lock();  //获得锁
        lock.lock();  //再次获取锁
    }

}

mainThe thread did not release the lock,Lock is called again,出现死锁.

自旋锁

Based on the spin lock non-reentrant lock

自旋锁,That is, the thread acquiring the lock is occupied when the lock is occupied,won't stop and wait,Rather keep trying,直到获取锁成功.

  • 好处: Threads are more active,减少线程上下文切换的开销.
  • 坏处: 很耗cpu,Especially when the wait time is long.Suitable for multi-core systems.
 
/** * 不可重入锁 : cas */
public class ReentrantForbiddenLockDemo {
    

    //原子引用: 持有锁的线程,Empty means no one owns
    private AtomicReference<Thread> owner = new AtomicReference<>();

    /** * 使用cas原子操作,而不使用synchronized同步了 */

    public void lock(){
    
// compareAndSet:原子操作,OS implementation dependent
        Thread thread = Thread.currentThread();  //获取当前线程
        //被其他的线程修改了,Abort this operation,Keep trying
        while(!owner.compareAndSet(null, thread)){
    
            System.out.println(String.format("%s等待%s释放锁", thread.getName(),owner.get().getName()));
        }
        System.out.println(thread.getName()+"获取到了锁");
    }

    public void unlock(){
    
        Thread thread = Thread.currentThread();
        //如果能进行cas,then the thread can be released
        if(owner.compareAndSet(thread, null)){
    
            System.out.println(thread.getName()+"释放了锁");
            return;
        }
        throw new IllegalMonitorStateException();
    }

    public static void main(String[] args) {
    
        ReentrantForbiddenLockDemo lock = new ReentrantForbiddenLockDemo();
        lock.lock();
        lock.lock();
    }
}

在这里插入图片描述

死锁

死锁:That is, the lock has already other threads(或自身)占有了,And he owns the lock of the other thread,Each other does not release the locks held by each other,导致死锁的现象.

/** * 死锁 */
public class DeadLock {
    
    public static void main(String[] args) {
    
        String lockA = "lockA";
        String lockB = "lockB";

        new Thread(new MyThread(lockA, lockB)).start();
        new Thread(new MyThread(lockB, lockA)).start();
    }

}

class MyThread implements Runnable{
    

    //两个锁资源
    private String lockA;
    private String lockB;

    public MyThread(String lockA, String lockB) {
    
        this.lockA = lockA;
        this.lockB = lockB;
    }

    @Override
    public void run() {
    
        synchronized (lockA){
    
            System.out.println(Thread.currentThread().getName()+"lock:"+lockA+" =>" +lockB);
            try {
    
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
            synchronized (lockB){
    
                System.out.println(Thread.currentThread().getName()+"lock"+lockB+"=>get"+lockA);
            }
    }
}}


在这里插入图片描述

原网站

版权声明
本文为[paiidds]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/223/202208110657216570.html