cool hit counter ThreadPoolExecutor thread pool source code analysis_Intefrankly

ThreadPoolExecutor thread pool source code analysis


1. Background

   The previous article presents an overall view of theExecutor interface, we know from the previous post thatExecutor The top-level implementation of the framework isThreadPoolExecutor category.Executors The factory class provided in thenewScheduledThreadPoolnewFixedThreadPoolnewCachedThreadPool approach

It's really just essentiallyThreadPoolExecutor The constructor parameters are just different. By passing in different parameters, you can construct thread pools for different application scenarios, so how is it implemented in the underlying principle?ThreadPoolExecutor The process of running a thread pool.

   The java.uitl.concurrent.ThreadPoolExecutor class is one of the most central classes in thread pooling, so if you want to understand thread pooling in Java thoroughly, you must first understand this class. Let's take a look at the source code for the specific implementation of the ThreadPoolExecutor class.

  (located) atThreadPoolExecutor Four constructs are provided in the class approach:

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

From the code above, we know that,ThreadPoolExecutor InheritedAbstractExecutorService category. and provides four constructors, de facto, By looking at the source code specific implementation of each constructor, It was found that the previous three constructors were initialized by calling the fourth constructor Work.

ThreadPoolExecutor The complete construction of the approach The signature is:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 
  • corePoolSize: Size of core pool, This parameter has a lot to do with how the thread pool is implemented, as described later。 After the thread pool is created, By default, There is no thread pool in the Any thread, Instead, it waits for a task to arrive before creating a thread to execute it, Unless the call toprestartAllCoreThreads() perhaps prestartCoreThread() approach, out of this2 size approach You can tell by the name of the, It means pre-creation of threads, I.e., create the task before no task comes corePoolSize One thread or one thread。
  • By default, after creating a thread pool, the number of threads in the thread pool is 0, when a task comes, a thread is created to perform the task, and when the number of threads in the line pool reaches corePoolSize, the arriving task is placed in the cache queue;
  • maximumPoolSize: the maximum number of threads in the thread pool, this parameter is also a very important parameter, it indicates the maximum number of threads that can be created in the thread pool.
  • keepAliveTime: Indicates that the thread has no mandate implementation How long does it last before it ends?。 By default, Only if the number of threads in the thread pool is greater thancorePoolSize time,keepAliveTime Only then will it work., until the number of threads in the thread pool is no greater thancorePoolSize。 That is, when the number of threads in the thread pool is greater thancorePoolSize time, If a thread is idle for a period of time up tokeepAliveTime, Then it will be terminated, Until the number of threads in the thread pool does not exceedcorePoolSize。 But if the call to allowCoreThreadTimeOut(boolean) approach, The number of threads in the thread pool is not greater thancorePoolSize time,keepAliveTime The parameters will also work, Until the number of threads in the thread pool is0;
  • unit: The time unit of the parameter keepAliveTime has 7 values and 7 static properties in the TimeUnit class:
 TimeUnit.DAYS;                /day
 TimeUnit.HOURS;              /hour
 TimeUnit.MINUTES;            /min
TimeUnit.SECONDS;           // second
 TimeUnit.MILLISECONDS;       /millisecond
 TimeUnit.MICROSECONDS;       //Subtle
 TimeUnit.NANOSECONDS;        /nanosecond
  • workQueue: A blocking queue, Used to store tasks waiting to be executed, The choice of this parameter is also important, can have a significant impact on the thread pool process, generally speaking, There are several options for the blocking queue here: ArrayBlockingQueue harmonyPriorityBlockingQueue Less used, General useLinkedBlockingQueue harmonySynchronous。 The queuing strategy of the thread pool is similar to that ofBlockingQueue concern。
 ArrayBlockingQueue;  Bounded queue
 LinkedBlockingQueue;  Unbounded queue
SynchronousQueue;  // A special queue that can only be joined if there are threads waiting to be taken out, arguably with a capacity of 1, an unbounded queue
PriorityBlockingQueue
  • threadFactory: thread factory, mainly used to create threads.
  • handler: indicates the policy when a task is rejected, and has the following four values. ThreadPoolExecutor is the underlying implementation of the Executors class. In the JDK help documentation, there is this passage. "It is highly recommended that programmers use it more easilyExecutors Factory methodsExecutors.newCachedThreadPool() (unbounded thread pool with automatic thread recycling), theExecutors.newFixedThreadPool(int) (fixed-size thread pool)Executors.newSingleThreadExecutor() (single background thread) They all have predefined settings for most usage scenarios. " The following is the source code for several classes. ExecutorService es = executor.newFixedThreadPool (int nThreads): Fixed size thread pool。 As you can see, the size of corePoolSize and maximumPoolSize are the same (in fact, as will be described later, the maximumPoolSize parameter is meaningless if using an unbounded queue), what do the values set for keepAliveTime and unit indicate? -It's the implementation that doesn't want to stay alive! The final BlockingQueue is the LinkedBlockingQueue, which has the feature that it is unbounded, and the reason why it is an unbounded LinkedBlockingqueue is that there is no way to know how many fixed threads the user has set, but the number is definitely within the unbounded range.
 ThreadPoolExecutor.AbortPolicy: discards the task and throws a RejectedExecutionException exception. 
 ThreadPoolExecutor.DiscardPolicy: also discards the task, but does not throw an exception. 
 ThreadPoolExecutor.DiscardOldestPolicy: discard the task at the top of the queue and retry the task (repeat this process)
ThreadPoolExecutor.CallerRunsPolicy: the task is handled by the calling thread 
public static ExecutorService newFixedThreadPool(int nThreads) {   
          return new ThreadPoolExecutor(nThreads, nThreads,   
                                        0L, TimeUnit.MILLISECONDS,   
                                       new LinkedBlockingQueue<Runnable>());   
      }

  ExecutorServicenewSingleThreadExecutor(): single-threaded

 public static ExecutorService newSingleThreadExecutor() {   
          return new FinalizableDelegatedExecutorService   
              (new ThreadPoolExecutor(1, 1,   
                                      0L, TimeUnit.MILLISECONDS,   
                                      new LinkedBlockingQueue<Runnable>()));   
      }

ExecutorService newCachedThreadPool(): unbounded thread pool that allows for automatic thread recycling

This implementation would be interesting. The first is an unbounded thread pool, so we can find maximumPoolSize as big big. Secondly BlockingQueue uses SynchronousQueue on the choice of BlockingQueue. It may be a bit strange for the BlockingQueue, to put it simply: in this QUE, each insert operation must wait for the corresponding removal of another thread Work.

public static ExecutorService newCachedThreadPool() {   
           return new ThreadPoolExecutor(0, Integer.MAX_VALUE,   
                                       60L, TimeUnit.SECONDS,   
                                        new SynchronousQueue<Runnable>());   
    }

From the code of the ThreadPoolExecutor class given above, we know that ThreadPoolExecutor inherits from AbstractExecutorService, which is an abstract class that implements the ExecutorService interface.

public abstract class AbstractExecutorService implements ExecutorService 

  The ExecutorService in turn inherits from the Executor interface

public interface ExecutorService extends Executor 

Executor

|-- ExecutorService 
  |-- AbstractExecutorService 
    |-- ThreadPoolExecutor

Let's look at the implementation of the Executor interface.

public interface Executor {
    void execute(Runnable command);
}

At this point, you should understand the relationship between Thread Pool Explorer, AbstractExecutorService, ExecutorService, and Executor.

  Executor is a top-level interface, In it only a declaration of approachexecute(Runnable), The return value isvoid, The parameters areRunnable types, Literally understandable, It's what you use to perform the tasks you pass in.;

   The ExecutorService interface then inherits from the Executor interface and declares a number of methods: submit, invokeAll, invokeAny, and shutDown.

   abstract categoryAbstractExecutorService AchievedExecutorService interface, Basically achievedExecutorService All of the statements in the approach;

   Then ThreadPoolExecutor inherits from class AbstractExecutorService.

   There are several very important methods in the ThreadPoolExecutor class.

public void execute(Runnable command)

public <T> Future<T> submit(Callable<T> task)

public void shutdown()

public List<Runnable> shutdownNow()  // Returning unexecuted tasks

execute() approach in factExecutor declared in the approach,(located) atThreadPoolExecutor Specific implementations were carried out, this one approach beThreadPoolExecutor core approach, Through this approach A task can be submitted to the thread pool, Leave it to the thread pool to execute。

  submit() approach It's in theExecutorService declared in the approach,(located) atAbstractExecutorService There would already be a concrete implementation,(located) atThreadPoolExecutor in There is no rewriting of the, this one approach is also used to submit tasks to the thread pool, but it harmonyexecute() approach different, It can return mandate implementation results, go to seesubmit() approach of realize, will find that it actually still calls theexecute() approach, Only it takes advantage of theFuture come and get mandate implementation result(Future Related content has been written about in other articles)。

   shutdown() and shutdownNow() are used to shut down the thread pool.

2. Specific source code implementation

Now that you're talking about the run process, what's the first thing to know about the state of the next thread pool?

volatile int runState;
static final int RUNNING    = 0;
static final int SHUTDOWN   = 1;
static final int STOP       = 2;
static final int TERMINATED = 3;

ThreadPoolExecutor The code defines several of the above variables: a volatile variable, runState, is defined, as well as several other constants that represent state. runState : The initial state, which indicates the current running state of the thread pool, is one of the four constants above

RUNNING : Thread pool accepts new tasks and executes queued tasks in...

SHUTDOWN: No more new assignments, But it will continue to execute the waiting queueQueued Missions in the。 When the call toshutdown() approach, will start with RUNNING -> SHUTDOWN

STOP: No more new assignments, Also no waiting queue is executedQueued Missions in the, and will attempt to terminate the task in progress。 When the call toshutdownNow() approach, will start with(RUNNING or SHUTDOWN) -> STOP

TERMINATED : All threads in the thread pool have stopped running, otherwise the behavior is the same as STOP.

  • When waiting for the queue harmony When the thread pool is empty, will start withSHUTDOWN -> TERMINATED
  • When the thread pool is empty, will start withSTOP -> TERMINATED

2.Thread pool running tasks

2.1 Introduction to variables

Before we explain the running process, let's look atThreadPoolExecutor A few of the more important member variables in

private final BlockingQueue<Runnable> workQueue; // Task Cache Queue, Used to save pending tasks, wait forworker Performing tasks when the thread is idle
 private final ReentrantLock mainLock = new ReentrantLock();  // This lock is required when updating poolSize, corePoolSize,maximumPoolSize, runState, and workers set
private final HashSet<Worker> workers = new HashSet<Worker>(); // Used to save working threads of execution
 private volatile long keepAliveTime;  // Between idle survival of threads outside of corePoolSize
 private volatile boolean allowCoreThreadTimeOut;  // Whether to set idle survival time for threads in corePoolSize
 private volatile int corePoolSize;  //Number of threads/core
 private volatile int maximumPoolSize;  Maximum number of threads (i.e. when the number of threads in the thread pool is greater than this parameter, the submitted task is placed in the task cache queue)
 private volatile int poolSize;  The current number of threads in the thread pool
 private volatile RejectedExecutionHandler handler;  The task rejection policy
 private volatile ThreadFactory threadFactory;  Thread factory, used to create a new thread
 private int largestPoolSize;  Record the maximum number of threads that have occurred in the thread pool
 private long completedTaskCount;  The number of threads that have been executed

Here's the kicker.corePoolSizemaximumPoolSizeworkQueue Two variables that relate to a strategy for the number of threads to be created in the thread pool. corePoolSize : This variable we can understand as the core size of the thread pool, as an example (corePoolSize is assumed to be equal to 10 and maximumPoolSize is equal to 20).

 There is a department in which there are 10 (corePoolSize) workers, and when a new task comes in, the leader assigns tasks to the workers to do, and each worker can only do one task.
 When all 10 workers are busy, the new incoming tasks have to be put into a queue (workQueue) to wait.
 When the tasks accumulated more and more, far faster than the workers could do them, the leader came up with a solution: borrow 10 workers from other departments, with a formula (maximumPoolSize - corePoolSize) to calculate the number of borrowed workers. Then assign the incoming tasks to the borrowed workers to do them.
 But if the speed is still not urgent, it may be time to take steps to abandon some tasks (RejectedExecutionHandler).
 When a certain amount of time has passed and the tasks are all done and the workers are relatively free, consider returning the 10 borrowed workers (as determined by keepAliveTime)
 That means that corePoolSize is the thread pool size, and maximumPoolSize seems to me to be a remedy for the thread pool, i.e. a remedy for when the amount of tasks is suddenly too large.

2.2 Thread execution process

Let's look at an example from the previous post:

ExecutorService executor = Executors.newFixedThreadPool(3);

        IntStream.range(0, 6).forEach(
      i -> executor.execute(() -> {
            String threadName = Thread.currentThread().getName();
            System.out.println("finished: " + threadName);
        }
  )

);

The code above is creating 6 new tasks and throwing them into the thread pool to run, outputting the thread names until they run out. One of the most central methods isexecute() approach, despitesubmit() can also perform tasks, but it also underlying callsexecute() approach, So I get it.execute() The implementation principle of

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {   //1.
            if (runState == RUNNING && workQueue.offer(command)) {    //2.
                if (runState != RUNNING || poolSize == 0)   //3.
                    ensureQueuedTaskHandled(command);  //4.
            }
            else if (!addIfUnderMaximumPoolSize(command))  //5.
                reject(command); // is shutdown or saturated //6
        }
}

The code above looks a bit complicated in its logic, so let's look at it one by one, starting with the above at position 1. if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) is an or expression, which is divided into two parts

  1. First determine if the current number of threads is greater than or equal to the number of core threads, If yes, go directly toif in the statement block, Judgment otherwise Second part
  2. Second partaddIfUnderCorePoolSize(command) , this method is used to create a new thread to execute a task when the number of threads is less than the number of core threads (because when the number of threads is less than corePoolSize, a new thread is created directly to run the task, regardless of whether there are currently free threads in the thread pool). If new fails, then go to the if statement block, and if it succeeds thenexecute approach That's the end of the execution. , because the thread has been successfully created and the task has started running in the thread pool.

After entering the if statement block, look at the code above 2.if (runState == RUNNING && workQueue.offer(command))

  1. Determine if the current thread pool state is RUNNING and the task is placed in the waiting queue successfully, then go directly to the if statement block
  2. Otherwise go to code 5.if (!addIfUnderMaximumPoolSize(command)) , determine whether the new task was executed successfully with the new thread (note: the new thread here is the "borrowed worker" maximumPoolSize we talked about above)
  3. If the "borrowed worker" still can't be processed, execute the task rejection strategy

Continue to the if statement block of code block 3if (runState != RUNNING || poolSize == 0), Because a new task has been added to the waiting queue, This judgment is used to prevent other threads from suddenly calling the task cache queue while adding this task to itshutdown perhapsshutdownNow approach A contingency measure for shutting down the thread pool。 If so., New mandate for emergency response accessionsensureQueuedTaskHandled(command)


Let's look at the next two keys approach realization of the: ##### 1.addIfUnderCorePoolSize

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        return t != null;
    }

Get the lock first, since it involves a change in the state of the thread pool. Then judge againif (poolSize < corePoolSize && runState == RUNNING),(located) atexecute() approach We have already judged once in, The reason for this re-judgment is to prevent other threads from adding new threads or callingshutdown、shutdownNow approach, This side has the effect of a double check.。 If it is fortrueThe words that carryt = addThread(firstTask) Adding a new thread to perform a task。addThread approach It's easier in there., It is the creation of threads through the thread factorythread, Then encapsulate it inWorker targets, join in workers queue, And execute the threads, It is possible to putWorker object is seen as an object that owns a thread。

private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w);
        boolean workerStarted = false;
        if (t != null) {
            w.thread = t;
             workers.add(w);  // used to save the threads executed at work
            int nt = ++poolSize;
            if (nt > largestPoolSize)
                largestPoolSize = nt;
            try {
t.start();  // The run method in the Worker object is executed The Worker class implements the Runable interface
                workerStarted = true;
            }
        }
        return t;
    }

Here's an introduction to the Worker object, which implements the Runnable interface, so you can treat it as a proxy class for Runnable, and ultimately execute its run method. Just take note of the Worker in thebeforeExecute harmonyafterExecute approach, These two approach(located) atThreadPoolExecutor No specific implementation in, The user can rewrite this approach harmony backafterExecute approach to do some statistics, For example, the execution time of a certain task, etc., but (not)afterExecute approach And one more.Throwable t parameter that the user can use to log some exception information, because exceptions in new threads are not caught and need to be logged in afterExecute. Doesn't this look a bit like the spring tangent, as you can see the knowledge is all connected. Take a look at it.run approach:

public void run() {
            try {
                hasRun = true;
                Runnable task = firstTask;
                firstTask = null;
                while (task != null || (task = getTask()) != null) {  //1
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);
            }
        }

Notice code block 1, you can see that this side is looping through fetching tasks and executing them until they are all executed. All tasks except the first one are done throughgetTask() approach accept or reject, this one approach beThreadPoolExecutor One of the approach。 Let's guess, Only tasks are saved in the task cache queue throughout the class, It should have been taken in the cache queue。

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                 r = workQueue.poll();  // Fetching tasks
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut) // If the number of threads is larger than the core pool size or if idle time is allowed for the core pool thread,
                 // then fetch the task by poll, and return null if you wait a certain amount of time for the task not to be fetched
                 r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);  // Set the time to pass before going for a poll fetch 
            else
                 r = workQueue.take();  //blocked fetch
            if (r != null)
                return r;
if (workerCanExit()) { // If no task is fetched, i.e. r is null, determine if the current worker can exit
                if (runState >= SHUTDOWN) // Wake up others
                     interruptIdleWorkers();    // Interrupt a worker in an idle state
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

Here's a very clever way to design a thread pool. If we were designing a thread pool, we might have a task dispatch thread, and when a thread is found to be idle, we would take a task from the task cache queue and give it to the idle thread for execution. But here, this approach is not used, because this will be additional to the task assignment thread management, invariably will increase the difficulty and complexity, here directly let the execution of the task thread Worker to the task cache queue inside to fetch tasks to execute, because each Worker contains a thread threadthread.

Let's take a look at the other one approach runTask()

private void runTask(Runnable task) {
            final ReentrantLock runLock = this.runLock;
            runLock.lock();
            try {
                /*
                 * If pool is stopping ensure thread is interrupted;
                 * if not, ensure thread is not interrupted. This requires
                 * a double-check of state in case the interrupt was
                 * cleared concurrently with a shutdownNow -- if so,
                 * the interrupt is re-enabled.
                 */
         // When the execution status of the thread pool is closed, etc., then executes the current thread'sinterrupt() operations
                if ((runState >= STOP ||(Thread.interrupted() && runState >= STOP)) && hasRun)
                    thread.interrupt();
                /*
                 * Track execution state to ensure that afterExecute
                 * is called only if task completed or threw
                 * exception. Otherwise, the caught runtime exception
                 * will have been thrown by afterExecute itself, in
                 * which case we don't want to call it again.
                 */
                boolean ran = false;
                beforeExecute(thread, task);
                try {
                 // mandate implementation
                    task.run();
                    ran = true;
                    afterExecute(task, null);
                    ++completedTasks;
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                runLock.unlock();
            }
        }

And one more. approach be Each time a thread task is performed will be calledworkerDone

  // Record the number of tasks performed, remove the worker threads, and try to close the thread pool when the poolSize is 0
    void workerDone(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
            if (--poolSize == 0)
                tryTerminate();
        } finally {
            mainLock.unlock();
        }

2. addIfUnderMaximumPoolSize

this one approach the idea of realization harmony addIfUnderCorePoolSize approach The idea of implementation is very similar, The only difference isaddIfUnderMaximumPoolSize approach is in the thread The number of threads in the pool is executed if the core pool size is reached and the task added to the task queue fails:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < maximumPoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        return t != null;
    }

By this point, most of you should have a basic understanding of the entire process from after a task is submitted to the thread pool until it is executed, as summarized below.

  1. (a) First, be clear about the meaning of corePoolSize and maximumPoolSize.
  2. Second, to know what role the Worker is used for.
  3. To know the strategy for handling tasks after they are submitted to the thread pool, here is a summary of 4 main points.
  • If the number of threads in the current thread pool is less than corePoolSize, then for every task that comes in, a thread will be created to execute that task.
  • If the number of threads in the current thread pool>=corePoolSize, Then each one comes to the task, An attempt is made to add it to the task cache queue, If the addition is successful, The task waits for the idle thread to take it out to execute; If the addition fails( In general, the task cache queue is full), An attempt is made to create a new thread to perform the task;
  • If the number of threads in the current thread pool reaches maximumPoolSize, a task rejection policy is applied for processing.
  • If the number of threads in the thread pool is greater than corePoolSize, if a thread is idle for longer than keepAliveTime, the thread will be terminated until the number of threads in the pool is not greater than corePoolSize; if a live time is allowed for threads in the core pool, then threads in the core pool will also be terminated if they are idle for longer than keepAliveTime.

My blog will soon move and sync to the Tencent Cloud+ community and invite you to join it: https://cloud.tencent.com/developer/support-plan?invite_code=2da2gtpfllwkk


Recommended>>
1、Global Database Ranking July 2018 MSSQLServer falls below alltime low
2、Hong Kong hacker arrested in bitcoin ransom case
3、A town in Hedong plans and designs the first 300acre fully enclosed test site for driverless cars in Shunyi
4、70 NumPy exercises tackling machine learning matrix operations in one fell swoop under Python
5、The most important thing is that you have to be able to see the difference between the two

    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送

    已发送

    朋友将在看一看看到

    确定
    分享你的想法...
    取消

    分享想法到看一看

    确定
    最多200字,当前共

    发送中

    网络异常,请稍后重试

    微信扫一扫
    关注该公众号