Java concurrency of inter-thread collaboration
In the last article we introduced the synchronized keyword, using it can effectively solve some of the common problems caused by our multithreading. For example, race conditions, memory visibility, etc. And, we also showed that the keyword is primarily an integration of adding and releasing locks, and that all threads that are trying to get a lock will be blocked on a blocking queue for some object. Instead, the collaboration between threads that we will describe in this post focuses on the use of another queue of objects (the conditional queue), where all threads that cannot continue because of unsatisfied conditions will wait on the conditional queue. The main elements involved are as follows.
I. Understanding the wait/notify methods These two methods are our main protagonists in this post, and they are defined in the root class Object.
public final void wait() public final native void wait(long timeout) public final native void notify(); public final native void notifyAll();
The two wait methods, the wait without parameters is equivalent to wait(0) which means wait indefinitely, and the wait method with parameters specifies how long the thread waits. The notify method is used to release a thread waiting on a conditional queue, while the notifyall method is used to release all threads that are doing so on a conditional queue. So when exactly do you call the wait method to get a thread to wait on the conditional queue, and when do you call notify to release the thread on the conditional queue?
We said that an object has a lock and two queues, and that all threads that cannot acquire the lock will be blocked on the blocking queue, while threads that acquire the lock and then have to terminate the program in the middle of the run due to some missing condition will be blocked on the conditional queue and give up the CPU. And it is important to note that threads blocked on a blocking queue and on a conditional queue exhibit different states. For example.
/* Define a thread class*/ public class MyThread extends Thread{ @Override public void run(){ try { System.out.println(Thread.currentThread().getState()); synchronized (this){ wait(); } } catch (InterruptedException e) { e.printStackTrace(); } } }
/* Starting a thread*/ public static void main(String[] args) throws InterruptedException { Thread thread = new MyThread(); thread.start(); Thread.sleep(1000); System.out.println(thread.getState()); System.out.println("main is out"); }
The output shows.
The main function starts a thread, the thread runs internally first outputting the current thread state, then calls the wait method to hang itself on the conditional queue of the current thread object and gives up the CPU, while we output the state of the thread again in the main function, from the result screenshot, the program does not end ---- shows that the child thread did not end properly, the state of the thread blocked on the conditional queue is waiting, which is completely different from the state of the thread blocked on the blocking queue blocked. However, when we call the notify or notifyall method to release a thread from the conditional queue, that thread has to compete for the object's lock just like any other thread out there, and if it fails to get the object's lock, it will still be blocked on the object's blocking queue.
II. Using wait/notify to solve the producer-consumer problem The producer-consumer problem is a classic problem in our operating system. Producers put a steady stream of products into the warehouse, and consumers take a steady stream of products out of the warehouse; when the warehouse is full, producers can't keep putting products into it, and when the warehouse is empty, consumers can't take products out of it. How to coordinate the operation of the repository by the producer and consumer threads is at the heart of this problem.
public class Repository { private ArrayDeque<String> list = null; private int limit; // Warehouse capacity public Repository(int limit){ this.limit = limit; list = new ArrayDeque<String>(this.limit); } /Warehouse available to producers for deposit operations public synchronized void addGoods(String data) throws InterruptedException { while(list.size() == limit){ // indicates that the warehouse is full wait(); } list.add(data); System.out.println("i produce a product:"+data); notifyAll(); } //Warehouse provided to consumers to take out operations public synchronized String getGoods() throws InterruptedException { while(list.isEmpty()){ // indicates that the warehouse is empty wait(); } String result = list.poll(); System.out.println("i consume a product:"+ result); notifyAll(); return result; } }
We define a repository class that provides methods for the producer to put in and methods for the consumer to take out. We implement the simulation of the repository using a double-ended queue, with the limit parameter limiting the capacity of the repository.
The producer's drop method, when the producer wants to drop a product into the warehouse, will block the current thread on the conditional queue if the warehouse is already full, and wait until the warehouse has a free spot. And if the warehouse is not full, a product is put into it and all threads blocked on the conditional queue (in this case actually consumer threads) are woken up. Once the consumer thread is released from the conditional queue, he will re-compete with the producer thread for the object lock, and after acquiring the object lock will return to the program location where he was last blocked due to insufficient conditions. The consumer's take out method is similar to the producer's put out method and will not be repeated here.
public class Producer extends Thread { // Producer threads keep producing products until the warehouse is full private Repository repository = null; public Producer(Repository r){ this.repository = r; } int count = 0; @Override public void run(){ while(true){ try { repository.addGoods(String.valueOf(count)); count++; } catch (InterruptedException e) { e.printStackTrace(); } } } }
Define a producer class where the producer always keeps producing the product and we use count to model the product designator.
public class Consumer extends Thread { // The consumer thread keeps taking products out of the warehouse until the warehouse is empty private Repository repository = null; public Consumer(Repository r){ this.repository = r; } @Override public void run(){ while(true){ try { String result = repository.getGoods(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
Define a consumer class where the consumer keeps taking products out of the warehouse.
public static void main(String[] args) throws InterruptedException { Repository repository = new Repository(20); Thread producer = new Producer(repository); Thread consumer = new Consumer(repository); producer.start(); consumer.start(); System.out.println("main thread is out"); }
Finally we define a repository and make the producer and consumer share the same repository object by passing in a constructor method. Starting two separate threads, the program will dead-loop the output of the producer and consumer production and consumption operations, and the following is part of the result of the program run.
We can see that the two threads, producer and consumer, alternate outputs, and occasionally the consumer lags the producer, but the consumer never overtakes the producer because the consumer can only take out the product after the producer has produced it. The above is the classic producer-consumer problem, and through the implementation of the problem, we were able to gain a deeper understanding of the wait/notify operations.
III. Implementation principle of join method The internals of the join method actually use the same wait/notify mechanism we introduced above.
public final void join() throws InterruptedException { join(0); }
public final synchronized void join(long millis) throws InterruptedException { long base = System.currentTimeMillis(); long now = 0; if (millis < 0) { throw new IllegalArgumentException("timeout value is negative"); } if (millis == 0) { while (isAlive()) { wait(0); } } else { while (isAlive()) { long delay = millis - now; if (delay <= 0) { break; } wait(delay); now = System.currentTimeMillis() - base; } } }
Two methods, the core one is still this join method with arguments. The method is broadly divided into three cases, throwing an exception if millis is less than 0. If millis equals 0, wait indefinitely, and this code is not sure how well you understand it: the
if (millis == 0) { while (isAlive()) { wait(0); } }
Thread thread = new MyThread(); thread.start(); thread.join();
Two small pieces of code, the first is an implementation of jdk for millis equals 0, and the second is a basic format for our call to the join method. We can see that since the method join is modified by the synchronized keyword, then our main thread needs to first obtain a lock on the thread object when calling that method on the thread object.
Going inside the join method, when millis equals 0, it is determined that as long as the thread object is alive, that is, the thread object is alive, the wait(0) method is called to hook the current thread (main) thread onto the thread object's conditional queue. Once the thread object has finished executing, the Java system will call notifyall to release all threads hanging on the object's conditional queue, at which point the main thread will be woken up, thus enabling a process where the main thread waits for the end of the thread's execution. As for the case where millis is greater than 0, except that the wait(long timeout) method is called internally, the rest of the implementation principle is basically similar, so I won't go into it here.
In this post, we focus on a collaboration mechanism between threads, using the wait/notify methods to collaborate on different threads. The understanding of the two methods wait/notify is increased by implementing the classic producer-consumer model, and finally the join method under Thread is studied from the source code point of view. The core of the method is to use wait/notify to collaborate the main thread and branch threads to achieve an operation of waiting. I would like to point out where the summary is lacking.