当前位置:网站首页>Future 用法详解

Future 用法详解

2022-04-23 17:27:00 Flechazo`

Future 用法详解

前言

为什么出现Future机制

常见的两种创建线程的方式。一种是直接继承Thread,另外一种就是实现Runnable接口。

这两种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。

从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

Future模式的核心思想是能够让主线程将原来需要同步等待的这段时间用来做其他的事情。

因为可以异步获得执行结果,所以不用一直同步等待去获得执行结果。

Future 用法详解

用法很简单

	  	System.out.println(" main start ");
        FutureTask<Integer> integerFutureTask = new FutureTask<>(new TestA());


        new Thread(integerFutureTask).start();
        System.out.println(" integerFutureTask ...");
        Integer integer = integerFutureTask.get();
        System.out.println(integer);
        System.out.println(" main end ");

class TestA implements Callable<Integer>{
    

    @Override
    public Integer call() throws Exception {
    
       System.out.println(" call start ");
        Thread.sleep(10000);
        System.out.println(" call end ");
        return 1;
    }
}

get时候会一直阻塞获取

在这里插入图片描述

当然也可以设置超时时间

public V get(long timeout, TimeUnit unit)

Future 简单原理

使用起来很简单,具体原理我们来研究一下

我们都知道Thread只能运行Runable接口

那返回值是怎么返回的呢?

FutureTask<Integer> integerFutureTask = new FutureTask<>(new TestA());此行代码可以看到FutureTask实现了 RunnableFuture接口

在这里插入图片描述
继续看RunnableFuture接口继承了我们的Runnable接口 还继承了一个 Future接口
在这里插入图片描述
Future接口
定义了返回值的一些方法

public interface Future<V> {
    

//取消
    boolean cancel(boolean mayInterruptIfRunning);

  //是否取消
    boolean isCancelled();

  //是否执行
    boolean isDone();

    // 获取
    V get() throws InterruptedException, ExecutionException;

//超时获取
  
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

原理流程如下

第一步
在这里插入图片描述

初始化任务,设置线程状态

我们可以看到线程几个状态以及从开始到结束的一些状态

在这里插入图片描述

outcome 是输出的返回值
runner 表示正在运行的线程
waiters 表示正在等待的线程

第二步

run方法运行

    public void run() {
    
    	//判断是否在运行 
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
    
        	//
            Callable<V> c = callable;
            if (c != null && state == NEW) {
    
                V result;
                boolean ran;
                try {
    
                	//实际 底层还是运行call接口
                    result = c.call();
                    //运行状态
                    ran = true;
                } catch (Throwable ex) {
    
                    result = null;
                    ran = false;
                    //处理异常
                    setException(ex);
                }
                if (ran)
                //c.call() 运行结束后,设置结果 
                    set(result);
            }
        } finally {
    
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            //如果线程被打断 
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

设置结果将线程状态设置,同时设置值 outcome

在这里插入图片描述

第三部
get 获取结果

在这里插入图片描述
可以看到当状态没有大于等于 COMPLETING 会阻塞

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
    
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
    
        //如果线程被打断 
            if (Thread.interrupted()) {
    
            //等待队列里移除
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            //执行完成
            if (s > COMPLETING) {
    
           
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
            //交出cpu执行权 再竞争
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
            //下一个线程 唤醒
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
    
            //超时
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
    
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

返回值 强转类型

在这里插入图片描述

线程池返回原理

提交有参数返回任务
在这里插入图片描述

提交代码
同样的知道单个是这样返回的,那

也是交给 FutureTask 去执行
在这里插入图片描述

自定义返回任务值

可以参考这个实现自己的提交

获取数据接口

public interface Future<T> {
    

    T get() throws InterruptedException;
}

做事情接口

public interface FutureTask<T> {
    

    T call();
}

异步提交处理任务

public class FutureTaskService {
    

    /** * 异步提交处理任务 * @param futureTask * @param <T> * @return */
    public <T> Future<T> submit(final FutureTask<T> futureTask){
    

        //异步返回
        AysFutureTask<T> aysFutureTask = new AysFutureTask();

        //线程处理任务
        new Thread(()->{
    
            //执行任务
            T call = futureTask.call();
            //执行完任务 通知返回
            aysFutureTask.done(call);

        }).start();

        //异步返回
        return aysFutureTask;
    }

}

测试

public class MainTest {
    

    public static void main(String[] args) throws InterruptedException {
    

        FutureTaskService futureTaskService = new FutureTaskService();
        Future<String> submit = futureTaskService.submit(() -> {
    
            //提交任务
            return doThing();
        });


        System.out.println(" -------- 返回 ------- ");
        System.out.println(" --------- 做其他事情 ----- ");
        System.out.println(" --------- do other ----- ");
        //获取 刚才提交的任务
        System.out.println(submit.get());


    }


    /** * 模拟数据库读写 或者 网络请求 * @return */
    private static String doThing(){
    

        try {
    
            Thread.sleep(5_000);
        } catch (InterruptedException e) {
    
            e.printStackTrace();
        }

        return " 做事情...";
    }
}

测试结果

在这里插入图片描述
可以看到我们先提交的任务,前面其他事情处理完后之后再获取结果,这期间这个任务就在执行,同一时间执行多个任务,在最后的时刻阻塞获取结果。

最后

FutureTask是Future的具体实现。

FutureTask实现了RunnableFuture接口。RunnableFuture接口又同时继承了Future 和 Runnable 接口。

Thread 可以提交 FutureTask 实际是执行 call方法
然后使用cas比较线程状态等待获取结果

Future继承图
在这里插入图片描述

当然 Future 还有其他扩展用法,如 CompletableFuture等

版权声明
本文为[Flechazo`]所创,转载请带上原文链接,感谢
https://pilgrim.blog.csdn.net/article/details/124333448