Java Concurrency of CyclicBarrier (Synchronization at the Collection Point) CyclicBarrier introduces the creation of CyclicBarrier encounters CyclicBarrier after hibernating CyclicBarrier's callback thread Cycli


  • CyclicBarrier introduces
  • Creating CyclicBarrier
  • Dormant after CyclicBarrier encounter
  • CyclicBarrier's callback threads
  • A simple example of a CyclicBarrier
  • CyclicBarrier for partition programming example

CyclicBarrier introduces

The CyclicBarrier class is a synchronization helper class, similar to, but far more powerful than, CountDownLatch. CyclicBarrier literally means Cyclic Barrier. What it does is to have a group of threads arrive at a barrier (which can also be called a synchronization point) blocked until the last thread reaches the barrier, when the barrier opens and all the threads blocked by the barrier continue to work. As shown in this diagram below

image.png

The CyclicBarrier is equivalent to a barrier inserted during thread execution, depending on where the thread calls the await method, and this barrier can't be taken away until after the specified number of threads has arrived.

Creating CyclicBarrier

When you create a CyclicBarrier class, you need to specify the number of threads to wait for

CyclicBarrier barrier = new CyclicBarrier(2);

Dormant after CyclicBarrier encounter

When setting the barrier at the specified location in the thread, simply call the wait method of the CyclicBarrier.

barrier.await();

The await method also specifies the amount of time to wait. When this wait time is reached, this barrier is lifted even if not enough threads arrive

barrier.await(10, TimeUnit.SECONDS);

The following are the conditions for waiting after a terminating thread encounters a barrier.

  • Enough threads reach the barrier to automatically unblock it
  • The thread waits for the screen specified wait time after which it times out and lifts the barrier
  • Threads are interrupted, other threads are interrupted and the barrier is lifted
  • An external thread calls the CyclicBarrier.reset() method and the barrier is lifted.

CyclicBarrier's callback threads

CyclicBarrier initialization, you can pass a runnable object as an initialization parameter, when all threads have reached the barrier point, the barrier will first execute this specified runnable object as a thread, after execution, it will remove the barrier to wake up all threads, this feature is very useful to reach the partition operation, fork/join. Imagine that we let threads compute their respective results before the barrier, and then when all threads are done, we execute the tally of all computations in the callback thread, which is equivalent to a partitioning technique, where a large task is sliced up into smaller tasks each executed by other threads, and they are aggregated after execution.

Runnable      barrierAction = ... ;
CyclicBarrier barrier       = new CyclicBarrier(2, barrierAction);

A simple example of a CyclicBarrier

Runnable barrier1Action = new Runnable() {
    public void run() {
        System.out.println("BarrierAction 1 executed ");
    }
};
Runnable barrier2Action = new Runnable() {
    public void run() {
        System.out.println("BarrierAction 2 executed ");
    }
};

CyclicBarrier barrier1 = new CyclicBarrier(2, barrier1Action);
CyclicBarrier barrier2 = new CyclicBarrier(2, barrier2Action);

CyclicBarrierRunnable barrierRunnable1 =
        new CyclicBarrierRunnable(barrier1, barrier2);

CyclicBarrierRunnable barrierRunnable2 =
        new CyclicBarrierRunnable(barrier1, barrier2);

new Thread(barrierRunnable1).start();
new Thread(barrierRunnable2).start();
public class CyclicBarrierRunnable implements Runnable{

    CyclicBarrier barrier1 = null;
    CyclicBarrier barrier2 = null;

    public CyclicBarrierRunnable(
            CyclicBarrier barrier1,
            CyclicBarrier barrier2) {

        this.barrier1 = barrier1;
        this.barrier2 = barrier2;
    }

    public void run() {
        try {
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() +
                                " waiting at barrier 1");
            this.barrier1.await();

            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() +
                                " waiting at barrier 2");
            this.barrier2.await();

            System.out.println(Thread.currentThread().getName() +
                                " done!");

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

running result

image.png

CyclicBarrier for partition programming example

We implement an example of CyclicBarrier partition programming Let's assume now that an array has the number of occurrences of an element, we split out several threads to count the different rows, have them wait at the barrier when they're done, and then when all the threads are done, we can call the callback thread to calculate the total result

class of large arrays

package CyclicBarrier;

import java.util.Random;

public class MatrixMock {
    private int[][] data;
    
    public MatrixMock(int size, int length, int number) {
        int counter = 0;
        data = new int[size][length];
        Random random = new Random();
        
        for(int i=0;i<size;i++) {
            for(int j=0;j<length;j++) {
                data[i][j] = random.nextInt(10);
                if(data[i][j] == number)
                    counter++;
            }
        }
        
        System.out.println("The matrix has " + counter + "  Number to be found " + number);
    }
    
    public int[] getRow(int row) {
        if((row >= 0) && (row < data.length))
            return data[row];
        return null;
    }
}

Outcome category.

package CyclicBarrier;

public class Results {
    private int[] data;
    
    public Results(int size) {
        data = new int[size];
    }
    
    public void setData(int position, int value) {
        data[position] = value;
    }
    
    public int[] getData() {
        return data;
    }
}

Search threads

package CyclicBarrier;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Searcher implements Runnable {

    private int firstRow;
    private int lastRow;
    
    private MatrixMock mock;
    
    private Results results;
    private int number;
    
    private final CyclicBarrier barrier;
    
    
    
    public Searcher(int firstRow, int lastRow, MatrixMock mock, Results results, int number, CyclicBarrier barrier) {
        super();
        this.firstRow = firstRow;
        this.lastRow = lastRow;
        this.mock = mock;
        this.results = results;
        this.number = number;
        this.barrier = barrier;
    }



    @Override
    public void run() {
        int counter;
        
        System.out.println(Thread.currentThread().getName() + " Data being searched" + firstRow + " " + lastRow);
        
        for(int i=firstRow;i<lastRow;i++) {
            int[] row = mock.getRow(i);
            counter = 0;
            for(int j=0;j<row.length;j++) {
                if(row[j] == number)
                    counter++;
            }
            results.setData(i, counter);
        }
        
        System.out.println(Thread.currentThread().getName() + " Finished checking.");
        
        try {
            barrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
        
        System.out.println(Thread.currentThread().getName() + " Finally.");
    }

}

Callback thread statistics results

package CyclicBarrier;

public class Grouper implements Runnable {

    private Results results;
    
    public Grouper(Results results) {
        this.results = results;
    }
    
    
    @Override
    public void run() {
        int finalResult = 0;
        
        System.out.println(" Results are being tallied。。。");
        
        int[] data = results.getData();
        
        for(int number : data) {
            finalResult += number;
        }
        
        System.out.println(finalResult);
    }

}

main class test

package CyclicBarrier;

import java.util.concurrent.CyclicBarrier;

public class Main {

    public static void main(String[] args) {
        
        final int ROWS = 10000;
        final int NUMBERS = 10000;
        final int SEACHER = 5;
        final int PARTICIPANTS = 5;
        final int LINES_PARTICIPANTS = 2000;
        
        MatrixMock mock = new MatrixMock(ROWS, NUMBERS, SEACHER);
        
        Results results = new Results(ROWS);
        
        Grouper grouper = new Grouper(results);
        
        CyclicBarrier barrier = new CyclicBarrier(PARTICIPANTS, grouper);
        
        Searcher[] searchers = new Searcher[PARTICIPANTS];
        
        for(int i=0;i<PARTICIPANTS;i++) {
            searchers[i] = new Searcher(i*LINES_PARTICIPANTS, i*LINES_PARTICIPANTS + LINES_PARTICIPANTS, mock, results, 5, barrier);
            new Thread(searchers[i]).start();
        }
    }

}

running result

image.png


Recommended>>
1、Data Structures Fundamentals Refresher6 Lookup below Hash Tables
2、SpringSpringMVCMyBatiseasyUI integration advanced chapter fifteen stage summary
3、CausedbyjavanetUnknownHostExceptionopenapialipaycom
4、Day 19 Jingdong Head Small Triangle Production
5、The incurable AIDS UN spreads AIDS education with VR film

    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送

    已发送

    朋友将在看一看看到

    确定
    分享你的想法...
    取消

    分享想法到看一看

    确定
    最多200字,当前共

    发送中

    网络异常,请稍后重试

    微信扫一扫
    关注该公众号