Java Concurrency of CyclicBarrier (Synchronization at the Collection Point) CyclicBarrier introduces the creation of CyclicBarrier encounters CyclicBarrier after hibernating CyclicBarrier's callback thread Cycli
The CyclicBarrier class is a synchronization helper class, similar to, but far more powerful than, CountDownLatch. CyclicBarrier literally means Cyclic Barrier. What it does is to have a group of threads arrive at a barrier (which can also be called a synchronization point) blocked until the last thread reaches the barrier, when the barrier opens and all the threads blocked by the barrier continue to work. As shown in this diagram below
image.png
The CyclicBarrier is equivalent to a barrier inserted during thread execution, depending on where the thread calls the await method, and this barrier can't be taken away until after the specified number of threads has arrived.
When you create a CyclicBarrier class, you need to specify the number of threads to wait for
CyclicBarrier barrier = new CyclicBarrier(2);
When setting the barrier at the specified location in the thread, simply call the wait method of the CyclicBarrier.
barrier.await();
The await method also specifies the amount of time to wait. When this wait time is reached, this barrier is lifted even if not enough threads arrive
barrier.await(10, TimeUnit.SECONDS);
The following are the conditions for waiting after a terminating thread encounters a barrier.
CyclicBarrier initialization, you can pass a runnable object as an initialization parameter, when all threads have reached the barrier point, the barrier will first execute this specified runnable object as a thread, after execution, it will remove the barrier to wake up all threads, this feature is very useful to reach the partition operation, fork/join. Imagine that we let threads compute their respective results before the barrier, and then when all threads are done, we execute the tally of all computations in the callback thread, which is equivalent to a partitioning technique, where a large task is sliced up into smaller tasks each executed by other threads, and they are aggregated after execution.
Runnable barrierAction = ... ; CyclicBarrier barrier = new CyclicBarrier(2, barrierAction);
Runnable barrier1Action = new Runnable() { public void run() { System.out.println("BarrierAction 1 executed "); } }; Runnable barrier2Action = new Runnable() { public void run() { System.out.println("BarrierAction 2 executed "); } }; CyclicBarrier barrier1 = new CyclicBarrier(2, barrier1Action); CyclicBarrier barrier2 = new CyclicBarrier(2, barrier2Action); CyclicBarrierRunnable barrierRunnable1 = new CyclicBarrierRunnable(barrier1, barrier2); CyclicBarrierRunnable barrierRunnable2 = new CyclicBarrierRunnable(barrier1, barrier2); new Thread(barrierRunnable1).start(); new Thread(barrierRunnable2).start();
public class CyclicBarrierRunnable implements Runnable{ CyclicBarrier barrier1 = null; CyclicBarrier barrier2 = null; public CyclicBarrierRunnable( CyclicBarrier barrier1, CyclicBarrier barrier2) { this.barrier1 = barrier1; this.barrier2 = barrier2; } public void run() { try { Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " waiting at barrier 1"); this.barrier1.await(); Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + " waiting at barrier 2"); this.barrier2.await(); System.out.println(Thread.currentThread().getName() + " done!"); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }
running result
image.png
We implement an example of CyclicBarrier partition programming Let's assume now that an array has the number of occurrences of an element, we split out several threads to count the different rows, have them wait at the barrier when they're done, and then when all the threads are done, we can call the callback thread to calculate the total result
class of large arrays
package CyclicBarrier; import java.util.Random; public class MatrixMock { private int[][] data; public MatrixMock(int size, int length, int number) { int counter = 0; data = new int[size][length]; Random random = new Random(); for(int i=0;i<size;i++) { for(int j=0;j<length;j++) { data[i][j] = random.nextInt(10); if(data[i][j] == number) counter++; } } System.out.println("The matrix has " + counter + " Number to be found " + number); } public int[] getRow(int row) { if((row >= 0) && (row < data.length)) return data[row]; return null; } }
Outcome category.
package CyclicBarrier; public class Results { private int[] data; public Results(int size) { data = new int[size]; } public void setData(int position, int value) { data[position] = value; } public int[] getData() { return data; } }
Search threads
package CyclicBarrier; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class Searcher implements Runnable { private int firstRow; private int lastRow; private MatrixMock mock; private Results results; private int number; private final CyclicBarrier barrier; public Searcher(int firstRow, int lastRow, MatrixMock mock, Results results, int number, CyclicBarrier barrier) { super(); this.firstRow = firstRow; this.lastRow = lastRow; this.mock = mock; this.results = results; this.number = number; this.barrier = barrier; } @Override public void run() { int counter; System.out.println(Thread.currentThread().getName() + " Data being searched" + firstRow + " " + lastRow); for(int i=firstRow;i<lastRow;i++) { int[] row = mock.getRow(i); counter = 0; for(int j=0;j<row.length;j++) { if(row[j] == number) counter++; } results.setData(i, counter); } System.out.println(Thread.currentThread().getName() + " Finished checking."); try { barrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " Finally."); } }
Callback thread statistics results
package CyclicBarrier; public class Grouper implements Runnable { private Results results; public Grouper(Results results) { this.results = results; } @Override public void run() { int finalResult = 0; System.out.println(" Results are being tallied。。。"); int[] data = results.getData(); for(int number : data) { finalResult += number; } System.out.println(finalResult); } }
main class test
package CyclicBarrier; import java.util.concurrent.CyclicBarrier; public class Main { public static void main(String[] args) { final int ROWS = 10000; final int NUMBERS = 10000; final int SEACHER = 5; final int PARTICIPANTS = 5; final int LINES_PARTICIPANTS = 2000; MatrixMock mock = new MatrixMock(ROWS, NUMBERS, SEACHER); Results results = new Results(ROWS); Grouper grouper = new Grouper(results); CyclicBarrier barrier = new CyclicBarrier(PARTICIPANTS, grouper); Searcher[] searchers = new Searcher[PARTICIPANTS]; for(int i=0;i<PARTICIPANTS;i++) { searchers[i] = new Searcher(i*LINES_PARTICIPANTS, i*LINES_PARTICIPANTS + LINES_PARTICIPANTS, mock, results, 5, barrier); new Thread(searchers[i]).start(); } } }
running result
image.png