当前位置:网站首页>Executor、ExecutorService、Executors、ThreadPoolExecutor、Future、Runnable、Callable

Executor、ExecutorService、Executors、ThreadPoolExecutor、Future、Runnable、Callable

2022-04-23 19:17:00 Li Siwei

Executor

Executor Class is an object that performs a submitted task .
This interface provides a method to separate the submission of tasks from the running mechanism of each task , Including thread usage details 、 Scheduling details, etc .

Typically, the thread created for the task display is replaced with Executor To perform the task . such as , When there are multiple threaded tasks , Create a thread for each task and start new Thread(new Runnable(){}).start() Code for , Can be replaced with :

Executor executor = anExecutor;
executor.execute(new Runnable(){
    });
executor.execute(new Runnable(){
    });

however ,Executor There is no strict requirement that task execution must be asynchronous . In the simplest case , yes Executor It can be applied execute(runnable) Method to run the submitted task immediately :

class DirectExecutor implements Executor{
    
	public void execute(Runnable r){
    
		r.run();
	}
}

A more typical case is ,Executor The submitted task will run in some other thread 、 Instead of calling execute Method . The following code example is an executor that creates a new thread for each task :

class ThreadPerTaskExecutor implements Executor{
    
	public void execute(Runnable r){
    
		new Thread(r).start();
	}
}

many Executor The implementation class will make some restrictions and requirements on the execution mode and time of the task . An executor implemented by the following code will serialize all tasks submitted to another Executor example , It shows that this is a composite actuator :

class SerialExecutor implements Executor{
    
	Executor executor;
	final Queue<Runanble> tasks = new ArrayDeque<>();
	Runnable active;

	public SerialExecutor(Executor executor){
    
		this.executor = executor;
	}
	public synchronized void execute(Runnable r){
    
		tasks.offer(r->{
    	// Queue.offer(E e);—— Add an element to the end of the line , Then check that if the queue is full, it will be automatically expanded 
			try{
    
				r.run();
			}finally{
    
				scheduleNext();
			}			
		});	
		if(active == null)		{
    	scheduleNext();		}					
	}

	protected synchronized void scheduleNext(){
    
		if((active = tasks.poll())!=null)		{
    	executor.execute(active);		}
	}
}

SerialExecutor Of execute Method first repackages the task submitted to it r, Repackaged new tasks run() The method code logic is :

  1. First, the code that will execute the original task r.run(),
  2. And then automatically from task In line poll A new encapsulated task , And call another executor.execute Method to perform this new task

After repackaging the submitted task , Will check the executor Whether the task is being performed , without , Just call scheduleNext() To start up executor Perform new tasks .

SerialExecutor The serial execution of the submitted task is realized . because scheduleNext() The method is protected Of , Therefore, external methods cannot call it directly to start another thread in parallel . and SerialExecutor Yes execute The task submitted by the method is newly encapsulated , So that one task will be called only after it has run executor To run a new task .

So when you call SerialExecutor Of execute When the method is used , What will happen ? For example, in the same thread , Call this method several times in a row ,synchronized The key word is reentrant lock ,execute Methods can also be executed multiple times , But it executes the code that encapsulates the new task , It's not starting another one more time executor.execute Method code , Start another executor The code is only in active== null Only when , Then it can still ensure the serial execution of tasks .

in addition ,tasks Is equivalent to using a queue Made a cache for the task , Because of the prevention of other executor Too late to handle the submitted task . And use synchronized To prevent multiple threads from accessing queue Data errors may occur when .

The same package Another interface in ExecutorService, Inherited Executor Interface , More widely used .
ThreadPoolExecutor Class is a class that can be extended by application developers Executor Implementation class .
Executors Class provides traversal factory methods for these implementation classes .

Memory consistency effects( Memory consistency impact ): Actions in a thread prior to submitting a {@code Runnable} object to an {@code Executor} happen-before its execution begins, perhaps in another thread.

public interface Executor{
    
	/** *  Execute a given... At some time in the future r. The command may be executed in a new thread 、 Or execute in a thread pool 、 Or execute directly in the calling thread  *  It's up to the implementation class to decide . */
	void execute(Runnable r);
}

ExecutorService

Execute component services :Executor Implementation class of , It provides a method to manage termination and generate an asynchronous task that can track the running process Future Methods .
ExecutorService Can provide a shutdown interface , Yes executorService After the closing method is applied to the object ,executorService Will refuse to accept a new task .
When executorService After termination , There are no running tasks , There are no tasks waiting to be performed , New tasks cannot be submitted . After the termination of executorService application shutdown Method , To allow recycling of its resources .

submit The method extends Executor.execute Method , It creates and returns a Future, Apply this Future You can undo the task or wait for the task to complete .

invokeAny & invokeAll Method is the most commonly used method to perform batch tasks , Will perform tasks in a collection , And wait for at least one or all tasks to be completed .

Application developers can extend ExecutorCompleionService class , Customize the above methods .

class Executors Provides a factory method for creating execution components .

The following code shows a simple web server that uses threads in the thread pool to serve network requests . It uses... With various configurations already set Executors.newFixedThreadPool Factory method :

public class NetworkService implements Runnable{
    
	private final ExecutorService pool;
	private final ServerSocket serverSocket;

	public NetworkServece(int port, int poolSize){
    
		serverSocket = new ServerSocket(port);
		pool = Executors.newFixedThreadPool(poolSize);
	}
	

	public void run(){
    
		try{
    
			for(;;){
    
				pool.submit(new Handler(serverSocket.accept()));
			}
		}catch(Exception e ){
    
			 pool.shutdown(); 
		}		
	}	
}
class Handler implements Runnable{
    
	final Socket socket;
	public Handler(Socket socket){
    
		this.socket = socket;
	}
	public void run(){
    
		// read and service request on socket
	}
}

The following code is for a ExecutorService Object implements a two-stage shutdown , To ensure that longer tasks can be cancelled . First pair executorService Object application shutdown Method , Refuse to accept a new task ; Then apply it shutdowNow Method , To undo tasks that take too long .

void shutDownAndAwaitTermination(ExecutorService pool){
    
	pool.shutdown();		//  Stop taking on new tasks 
	try{
    
		if(!pool.awaitTermination(60, TimeUnit.SECONDS){
    	//  Give tasks in progress and tasks waiting to be performed 60 The second time .
			pool.shutdownNow();		// 60 Seconds later, there are still tasks unfinished , Then directly cancel the tasks being executed or waiting to be executed 
			if(!pool.awaitTermination(60, TimeUnit.SECONDS){
    	//  Wait again 60 Seconds to the task to respond to the revocation request 
				System.err.println("pool did not terminate");		//  There are still tasks that have not been successfully revoked 
			}
		}
	}catch(InterruptedException e){
    
		pool.shutdownNow();		//  If the current thread is also interrupted , Still perform shutdown 
		Thread.currentThread().interrupt();  //  Keep the interrupt state of the current thread 
	}finally{
    }
}

Memory consistency effect : towards ExecutorService To submit a Runnable Task or Callable The action of the task must take place in ExecutorService Before running the action of this task , And the action of running this task must take place through Future.get() Method to get the running result of this task .

public interface ExecutorService extends Executor{
    
	//  Start an orderly shutdown , Stop taking on new tasks , Complete all submitted tasks . But it will not block and wait for all the submitted tasks to be completed . if necessary , Use awaitTermination().
	//  For closed ExecutorService There is no effect .
	// throws SecurityException, If set security manger?
	void shutdown();

	//  Stop all really running tasks , Stop processing tasks waiting to be executed 
	//  Return to the list of tasks waiting to be performed 
	List<Runnable> shutdownNow();
	
	//  If it's closed , return true
	boolean isShutdown();

	//  If in application shutdown() After the method , All tasks have been completed . Then return to true
	//  If before calling this method , Without first calling shutdown Method , Then you can never return true.
	boolean isTerminated();

	//  Block the current thread , until ExecutorService All tasks received have been completed 、 Or a timeout 、 Or be interrupt interrupt 
	//  If it is returned after all tasks are executed , return true; If it is returned after timeout , return false; If interrupted , Throw out InterruptedException abnormal 
	boolean awaitTermination(long timeout, TimeUnit unit);

	//  Submit a pending task with a return value , And return a... Representing the return value of the task Future. call Future.get() Method will block , Until the task is completed , The task will return the value of .
	//  If allowed, immediately block and wait for the return value , It can be in the form of :exec.submit(aCallable).get();
	<T>  Future<T>	submit(Callable<T> task);
	
	//  ditto 
	<T> Future<T> submit(Runnable r, T result);
	
	//  ditto , but Future.get() It will block , Until the task is completed , Return to one null
	Future<?> submit(Runnable r);
	
	//  Block until the execution of the set tasks All tasks in , Then return the data that encapsulates the execution status and results of each task Future Of List.
	//  Because the method will block, it will not return until all tasks are executed , So for each in the list Future application isDone The return value of the method must be true
	// tasks The task in may return after normal operation , It may also be aborted due to an exception .
	//  If the collection is updated during method blocking tasks, So back List The details of are unknown 
	//  Back to List Medium Future Order and use iterator() Traverse tasks In exactly the same order 
	//  If the blocking process is interrupted , Throw an interrupt exception , And all unfinished tasks will be cancelled 
	//  If an element in the list is null, An exception will be thrown 
	<T> List<Future<T>> invokeAll(Collection<? extends Callable> tasks);
	
	//  ditto , But if the timeout returns , The tasks in progress but not completed will be cancelled 
	<T> List<Future<T>> invokeAll(Colleciton<? extends Callable> tasks, long timeout, TimeUnit unit);

	//  Blocking , until tasks The normal execution of any task in , Then return the execution result of this task . A task must be completed normally .
	//  Whether the method returns normally or terminates abnormally , Will undo all ongoing unfinished tasks 
	//  If parameters tasks by null, Throw out IllegalArgumentException
	//  If no task is executed normally ,
	<T> T invokeAny(Collection<? extends Callable> tasks) throw ExecutionException;
	
}

Callable

Used to create a task that has a return value and can throw a checked exception .
And Runnable The same interface is : Both are designed to create tasks that need to run in another thread . but Runnable Interface cannot return results , And cannot throw the detected exception .
Executors Class provides the ability to convert other forms of tasks into Callable The tool method of the instance of the interface .

@FunctionalInterface
public interface Callable<V> {
    
    /** *  Work out a result , or throws an exception if unable to do so. * * @return  The result of the calculation is  * @throws Exception if unable to compute a result */
    V call() throws Exception;
}

Executors The information provided in will Runnable The instance is converted to Callable Examples of tools :

public class Executors{
    

	/** *  Usually it needs to be converted into Callable To carry out , But when you don't need a return value , It can be in the form of : * Callable<?> callable = new RunnableAdapter<Void>(aRunnable, null); * Void  yes  void The wrapper class .void  Is and  java One of the eight basic types java  Primitive types . */	
	static final class RunnableAdapter<T> implements Callable<T> {
    
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
    
            this.task = task;
            this.result = result;
        }
        public T call() {
    
            task.run();
            return result;
        }
    }

}

Future

One Future The instance represents the execution result of an asynchronous calculation .
It provides blocking and waiting for the result to be obtained after the asynchronous calculation is completed (get)、 Undo the calculation task (cancel)、 Check whether the asynchronous calculation has been completed (isDone)、 Check whether the calculation is cancelled (isCancelled) Interface method of .
If the calculation has been performed , Cannot be cancelled cancel.

public interface Future<T>{
    


    /** *  If the task has been completed 、 Or it has been cancelled , Then the cancellation fails , Method returns false. *  If the task is in progress , According to the parameters mayInterruptIfRunning Determine whether to set the interrupt state of the thread executing this task  *  When this method returns , If the isDone Method , Then the return value is true. *  If this method returns true, Then subsequent calls isCancelled The return value of the method is true. */
    boolean cancel(boolean mayInterruptIfRunning);

    /** *  If the task is successfully completed before execution cancel() 了 , Then return to true */
    boolean isCancelled();

    /** *  Whether the task ends normally or aborts abnormally , All back to true. */
    boolean isDone();

    /** *  Block waiting for task execution to complete , Then get the return value of the task  * @return  The result of the calculation is  * @throws CancellationException  If the task is cancelled  * @throws ExecutionException  If an exception is thrown while the task is running  * @throws InterruptedException  If the application get() The current thread of the method was interrupted : If the current thread is interrupted while blocking waiting for the task calculation to complete  */
    V get() throws InterruptedException, ExecutionException;

    /** *  Block waiting for the task to complete , And return the calculation result of the task . * @throws CancellationException  If the task has been cancelled * @throws ExecutionException  If an exception is thrown during task execution  * @throws InterruptedException  If the current thread is interrupted while blocking and waiting  * @throws TimeoutException  If the wait times out  */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

}

RunnableFuture

One RunnableFuture An instance is one that can represent a Runnable The execution result of the instance Future object .
When Runnable Of run After the method is executed ,Future The object is complete , And you can access it to get the running results .

public interface RunnableFuture<V> extends Runnable, Future<V> {
    
    /** * run() Method will set the current RunnableFuture Object represents the result of its calculation  */
    void run();
}

FutureTask

A cancelable asynchronous calculation ,FutureTask Class provides Future The basic implementation of the interface , Start and cancel the calculation 、 Check whether the calculation has been completed 、 Interface method for obtaining calculation results
Only after the calculation is completed , The calculation results can be obtained . If the calculation has not been completed 、get Method will block the current thread .
Once the calculation is executed , Computing tasks can no longer be restarted or cancelled 、 Unless applied runAndReset Method .

FutureTask Can be used to encapsulate Runnable Object or Callable object , because FutureTask Realized Runnable Interface , Therefore, it can be submitted to executor perform .

public class FutureTask<V> implements RunnableFuture<V> {
    
	private volatile int state;		//  The current execution status of the task 
	private static final int NEW = 0;
	private static final int COMPLETING = 1;
	private static final int NORMAL = 2;
	private static final int EXCEPTIONAL = 3;
	private static final int CANCELLED = 4;
	private static final int INTERRUPTING = 5;
	private static final int INTERRUPTED = 6;
	/** * state From NEW There are only three ways to move to the final state :set \ cancel -> CANCELLED \ setException ->EXCEPTIONAL * state Possible transfer process of the value of : * NEW - COMPLETING - NORMAL * NEW - COMPLETING - EXCEPTIONAL * NEW - CANCELLED * NEW - INTERRUPTING - INTERRUPTED *  stay completion In the process ,state The value of may pass through the instantaneous state :COMPLETING  or  INTERRUPTING */
	/** *  Revision : Previous versions were used AQS To block in get() Thread synchronization on method . But the current version uses CAS to update state To complete thread synchronization independently , And one. Treiber Stack to store blocking in get() On the thread . */
	
	/**  The implicit callable ,  After the task is executed, it will be set to null */
	private Callable<V>	callable;
	/**  call get() Method to return the result of the run 、 Or the exception to throw  */
	private Object outcome;		//  It is not set to volatile Of , It's about using state Of reads\writes To protect 
	
    /**  Used to perform callable The thread of the task ; CASed during run() */
    private volatile Thread runner;		//  When entering run After the method , First, the current thread CAS Assign a value to runner Variable 
    /**  One is used to store blocking waiting in get() Of the thread on treiber  Stack  */
    private volatile WaitNode waiters;	// treiber Stack is actually a stack based on CAS Lock free concurrency stack of atomic operations 
	
	   /** *  Return the result of task running 、 Or throw an exception during the running of the task  * @param s state Value  */
    private V report(int s) throws ExecutionException {
    
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

   /** *  Constructor a new FutureTask Mission , When running this task , Will execute callable Code for  */
    public FutureTask(Callable<V> callable) {
    
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }
        /** *  Construct a FutureTask Mission , When a task is executed, it runs runnable Code for , When the task is normally executed ,get Method will return the given result Parameters  *  If there is no specific return value , You can use the following format :Future<?> f = new FutureTask<Void>(runnable, null)} */
    public FutureTask(Runnable runnable, V result) {
    
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

    public boolean isCancelled() {
    
        return state >= CANCELLED;
    }

    public boolean isDone() {
    
        return state != NEW;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
    
        if (!(state == NEW &&		//  Only in NEW State, , success cas Updated state The state of . Is to continue to cancel The premise of , Otherwise directly return false;
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {
        // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
    	//  If you need to interrupt , Set the final status to INTERRUPTED.
                try {
    
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally {
     // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
    
            finishCompletion();
        }
        return true;
    }
  /** * @throws CancellationException  If the mission has been successfully cancelled */
    public V get() throws InterruptedException, ExecutionException {
    
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }


    /** * @throws CancellationException {@inheritDoc} */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    ... }
    /** *  take v Assigned to the variable representing the calculation result of the task outcome. If so FutureTask Has been set too , Or has been cancelled, Then nothing will be done . *  This method is in run Method is called after successful execution  */
    protected void set(V v) {
    
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
  /** *  take t Assigned to the object representing the calculation result of the task outcome, If so Future Has been set too , Or has been cancelled, Will not do anything  *  This method is in run Method is called when it exits abnormally . */
    protected void setException(Throwable t) {
    
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    } 

    public void run() {
    
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
    
            Callable<V> c = callable;
            if (c != null && state == NEW) {
    
                V result;
                boolean ran;
                try {
    
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
    
                    result = null;
                    ran = false;
                    setException(ex);    // run Method exited abnormally , call setException Assign an exception to outcome
                }
                if (ran)
                    set(result);	// run The method exits normally , call 
            }
        } finally {
    
            runner = null;	//  When not state Before the status of is updated to the final value ,runner Do not empty , To prevent concurrent execution run
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
}

    /** *  Perform calculation tasks , But don't set the result . Then reset the current object to its initial state . *  If an exception is thrown during execution, it returns false. *  For tasks that logically need to be performed multiple times . * @return {@code true} if successfully run and reset */
    protected boolean runAndReset() {
    
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
    
            Callable<V> c = callable;
            if (c != null && s == NEW) {
    
                try {
    
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
    
                    setException(ex);
                }
            }
        } finally {
    
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
        return ran && s == NEW;
    }

stay FutrueTask Class , Indirectly inherited from Future Interface method (get、cancell、isDone、isCancelled), Is the client method , Called by the client thread .
And indirectly inherited from Runnable Interface method (run), Is the method that will be called by the real execution thread of the task , For example, a worker thread in the thread pool .

that FutureTask How does the class coordinate these two threads ? For client threads , The general flow is as follows :

  • First : A client thread , The real task to be performed is a Runable example r , Construct into a FutureTask(Runnable r, V v) Instance object
  • next : A client thread , Put this FutureTask The instance is submitted to a thread pool ; Or the client thread directly creates a new thread , To execute this implementation Ruannble Interface FutureTask example :new Thread(futureTask).start();.
  • When the client thread will FutureTask After the thread pool is submitted to another thread , A new thread or thread pool will be submitted to you at a later point in time FutureTask Instance application run() Method , But the exact time of execution cannot be predicted .
  • next : The client thread that created and submitted the task , Usually in the subsequent logic , application FutureTask The client method of the instance (get \ cancell \ isDone \ isCancelled) To wait for the task to complete 、 Or get run() Result 、 Or cancel the task .
  • When the client thread applies FutureTask Example of get() Method futureTask.get(),get() Method will add the client thread to the waiting queue waiters(), And block it , In other words, the client thread will be blocked in futureTask On the object :LockSupport.park(this, nanos);. Until the thread application actually executes the task r.run() And after execution , Will wake up these waiting threads (LockSupport.unpark(t) ); Or until the client thread is interrupted t.interrupt() ; Or other client threads apply... To the task cancel() Method
  • When the client thread applies FutureTask Example of cancel Method futureTask.cancel(), The client thread will respond to the thread that actually performs the task runner application runner.interrupt() Method , Interrupt task execution , Then it wakes up waiters All blocks in the waiting queue are get() Method .

For the thread that actually performs the task :

  • The real execution thread of the task , Will be applied futureTask Of run Method .
  • The execution thread is executing futureTask Of run When the method is used , First, set the properties of the object runner For the current execution thread :UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())
  • next : Call construct FutureTask On instance Runnable r Of run Method , Perform real task logic .
  • Last :LockSupport.unpark(t) Wake up the waiters All blocks in the queue are get() Thread on method . No matter what r.run Whether the execution is completed normally or abnormally .

The above content briefly describes how the client thread submitting the task coordinates with the execution thread of the task , Where it's applied volatile + UNSAFE.compareAndSwapXXX、LockSupport.parkNanos(this, nanos)、LockSupport.park(this)、LockSupport.unpark()、t.interrupt()、Thread.yeild()、 etc. JDK Coordinated between native threads API. There are the following key points :

  • volatile Thread runner ; // The real execution thread of the task , Used to prevent multiple execution threads from executing this task at the same time , When multiple execution threads enter at the same time futureTask.run() After the method , Will execute if(state!=NEW || !UNSAFE.casObjct(this, runnerOffset, null, Thread.current())) return false; , namely , If runer Not for null, It indicates that there is already a thread executing this task , The current execution thread should exit .
  • volatile int state; // The status of the task , The same thing UNSAFE.casInt To control .
  • volatile WaitNode waiters; // call get() Method after the client thread t, If the task has not been completed , Then the client thread will be constructed as WaitNode node q,q.thread=t;, Then head insert waiters Linked list :s = waiters; UNSAFE.casObject(this, waitersOffset, q.next = s, q ); And blocked

FutureTask Realized RunnableFuture, Then it itself is a Runnable, Can pass Executor.execute Method submit and execute .
This Runnable The implementation of the :run() { ... result = callable.call(); ... }
FutureTask Class is a Runnable, It must be a task to perform , This task uses member variables Callable<V\> callable; Express ;
FutureTask Class needs to provide the function of monitoring the status of tasks , This function uses member variables volatile int state; To carry .
FutureTask The provided block waits for the task to run 、 To get the task results after the end of the function , Is through a single linked list volatile WaitNode waiters; To achieve , When a thread calls get() After the method , It's going to take itself cas Head insert method to add waiters queue , And then call LockSupport.park() Method blocking wait , until run | cancel During the execution of these two methods , call LockSupport.unpark(t) Wake up the waiters All blocked waiting threads in the .
FutureTask Class defines a member variable Object outcome, Used to store calculation results 、 perhaps run Exception thrown during method execution .
Member variables are also defined volatile Thread runner, Used to record the thread executing this task .

run() The way is when the task is Executor Called at execution time :

  1. First , The task must be in state=NEW The state of , In other states, it indicates that the task has been started or finished .
  2. next , call casRunner(this,runnerOffset, null, Thread.currentThread()), Set the execution thread of the task runner Is the current thread . Because it is possible that multiple threads compete to perform the same task , So we have to use cas+volatile To update
  3. next , call V v = callable.call();, Real computational logic for performing tasks
  4. If call() Method successfully executed , Call set() Method , First casState(this, stateOffset, NEW, COMPLETING); Again outcome=v;, Last putOrderedInt(this, stateOffset, NORMAL), Last traversal waiters, Wake up all threads that block waiting to get the running results of the task .
  5. If call() Method threw an exception exception, Call setException(e) Method , First casState(this, stateOffset, NEW, COMPLETING ), Again outcom=e., Last putOrderedInt(this, stateOffset, EXCEPTIONTAL), Last call finishCompletion(), take waiters Wake up all the threads waiting to get the task results in the .

cancel( mayInterruptIfRunning ) The method will be actively called by other threads according to the business logic :

  1. First , Task status must be state=NEW The state of ;
  2. next , call casState( this, stateOffset, NEW, mayInterruptIfRunning? INTERRUPTING : CANCELLED )
  3. next ,if(mayInterruptIfRunning ) { try { runner.interrupt(); } finally { putOrderedInt(this, stateOffset, INTERRUPTED); } }, Will execute thread runner The interrupt state of the is set . Whether this setting is successful or not , Update the status to INTERRPTED;
  4. Last , call finishComplete() Method , Wake up the waiters All threads in the linked list waiting to get the execution result .

It can be seen that ,run() During method execution ,state There may be a shift in two paths :

  • If callable.call() Successful execution completed , Call set(v) Set up state by : NEW -> COMPLETING -> NORMAL
  • If callable.call() Exception thrown during execution , Call setException(e) Set up state by : NEW -> COMPLETING -> EXCEPTIONAL
    and cancel(mayInterrupted) Implementation ,state There will also be a shift in two paths :
  • If mayInterrupted==true, be state by : NEW -> INTERRUPTING -> INTERRUPTED
  • If mayInterrupted==false, be state by : NEW -> CANCELLED
    No matter what run() call set(v) still setException(e), Both methods will eventually call finishCompletion() Wake up the waiters All in park Threads on this task .cancel() Method also calls finishCompletion() Method .

get() Method , Will call await() Method , In this method, the thread will add itself to waiters Linked list , then park(this) Block waiting , until run() perhaps cancel() After method execution , Wake up these waiting threads .

isDone() Method : as long as state!=NEW, Just go back to true. Because not NEW state , Or call The method has been executed ( No more blocking ), Or cancel() 了 .

版权声明
本文为[Li Siwei]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204210600380584.html