panacea programming(18)- Design of a pan-functional library- Parallel Computing Component Library


As professional programmers, we often build libraries of tools for our work. The so-called tool library is often encountered for the work of some common problems pre-prepared by a set of functions composed of a library of functions. Usually the functionality of these tool libraries is implemented by a series of functions that perform operations around some data types supported by specially customized data types. There is no exception in the context of generic programming. However, functions in a generic tool library place more emphasis on functional composition; thus, generic tool libraries are generally called combinator libraries, and the functions within the library are called combinators. The designers of component libraries share the most basic goal of function design: greater functionality can be achieved by combining various functions on components. Generalized component libraries are generally designed for particular functional requirements or topics: first try to express the topic requirements in terms of some data types, and then design a series of functions around these specialized data types to provide solutions for each of the most basic requirement areas of the topic. We introduce the generalized component library design pattern in this section of the discussion by looking at the design process of a parallel computing component library.

We designed this parallel computing component library for this purpose: to be able to put a common operation into another separate thread (thread) to run. This allows us to run multiple operations in parallel by putting them in multiple threads at the same time. The problem is simple and clear, but it is worth thinking carefully about how to combine and transform these operations in their own separate operating spaces.

Let's start with the data type: a parallel operation should be like a container that encapsulates a common operation inside. Let's make a random structure out of this: Par[A], where A is the type of result returned by the ordinary operation. This Par type is much like the higher-order type we touched on earlier, that pipe type that hosts elements of type A. If we think of it this way, we can use all the previous functions for higher-order types to manipulate the element A inside the tube. Then if an operation is encapsulated in Par in another thread it always needs a method to get the result out after the operation is done. This allows us to first derive the two most basic functions.

 1 def unit[A](a: A): Par[A] // inject a normal operation into Par. Upgrading A to a parallel operation
 2 def get[A](pa: Par[A]): A // Extract the results of the parallel run

The next question is running thread control: is it up to the programmer to decide whether an operation should be put into a new thread or is it fixed to use a new separate thread for each operation? Suppose we choose to use a function called by the programmer to determine the generation of new threads. This has two advantages: 1) it allows for a more flexible parallel computing strategy (some operations that have been determined to complete quickly may not be necessary to use new threads, and independent threaded operations may consume more resources); 2) the independent thread mechanism and parallel computing are loosely coupled: there is no need to understand the thread management mechanism in Par's implementation. The style of this function is as follows.

 def fork[A](pa: Par[A]): Par[A] // Set a new run space for pa. does not change the pa and still returns Par[A]

Then putting an operation to run in a new thread can be expressed by this function.

def async[A](a: => A): Par[A] = fork(unit(a))  //No need to know anything aboutPar information on。 knowfork will set a new operating space for this operation。 Note that it still returnsPar[A]

Since we are after a loose coupling of the threading mechanism and parallel operations, then we will not actually run parallel operations in Par, and then Par is just a description of a parallel operation. The return of fork is still Par, it just adds a description of the computing environment and doesn't actually run the algorithm. In this way Par if it is an arithmetic description, then we need a real running mechanism to get the result of the arithmetic.

 1 def run[A](pa: Par[A]): A // Since the meaning of Par changed from a container to an arithmetic description, we renamed get to run

We'll then need to do thread management, compute runs, and other real Par runs in the function implementation methods of run.

Par is now expressed in the following forms.

 1 def unit[A](a: A): Par[A] // inject a normal operation into Par. Lifting A to a parallel operation describes
 2 def fork[A](pa: Par[A]): Par[A] // Set a new run space for pa. The returned result Par must be run by run and get the result
3 def async[A](a: => A): Par[A] = fork(unit(a))  //No need to know anything aboutPar information on。 Note that it still returnsPar[A]
 4 def run[A](pa: Par[A]): A // Run pa and extract the result of the operation

I think it was after v1.6 that the java API included the java.util.concurrent package, which includes the ExecutorService class to provide support for thread management. The ExecutorService and Future classes are translated into scala as follows.

class ExecutorService {
  def submit[A](a: Callable[A]): Future[A]
}
trait Future[A] {
  def get: A
  def get(timeout: Long, unit: TimeUnit): A
  def cancel(evenIfRunning: Boolean): Boolean
  def isDone: Boolean
  def isCancelled: Boolean
}

We don't need to get into the underlying details of multi-threaded programming, using java Concurrent ExecutorService is enough. ExecutorService provides a way to submit the required operation to the system in the form of a Callable; the system immediately returns Future, and we can use Future.get to read the operation in a thread-locked manner. Since the operation result reading is done as a locked thread (blocking), it is important to use the get time node: if the next direct get step after committing an operation immediately locks the thread until the operation completes, then we won't get any parallel operation effect. Future also offers features such as Run Status and Interrupt Run to provide programmers with more powerful and flexible computing control. To gain more flexible control, the return value of Par should be changed from reading A directly from a locked thread to a Future that does not have the effect of locking the thread.

1 type Par[A] = ExecutorService => Future[A]
2 def run[A](es: ExecutorService)(pa: Par[A]): Future[A] = pa(es)

Now the meaning of Par has changed again from a data type to a function description: pass in an ExecutorService and return Future. We can run this function with run and the system will return the Future immediately, without any waiting.

Let us implement all these most basic functions as follows.

 1 object par {
 2 import java.util.concurrent._
 3 
 4 type Par[A] = ExecutorService => Future[A]
 5 def run[A](es: ExecutorService)(pa: Par[A]): Future[A] = pa(es)
 6                                                   //> run: [A](es: java.util.concurrent.ExecutorService)(pa: ch7.par.Par[A])java.u
 7                                                   //| til.concurrent.Future[A]
 8 
 9 def unit[A](a: A): Par[A] = es => {
10     new Future[A] {
11         def get: A = a
12         def isDone = true
13         def isCancelled = false
14         def get(timeOut: Long, timeUnit: TimeUnit): A = get
15         def cancel(evenIfRunning: Boolean): Boolean = false
16     }
17 }                                                 //> unit: [A](a: A)ch7.par.Par[A]
18 def fork[A](pa: Par[A]): Par[A] = es => {
19     es.submit[A](new Callable[A] {
20       def call: A = run(es)(pa).get
21     })
22 }                                                 //> fork: [A](pa: ch7.par.Par[A])ch7.par.Par[A]
23 def async[A](a: => A): Par[A] = fork(unit(a))     //> async: [A](a: => A)ch7.par.Par[A]
24 
25 val a = unit(4+7)                                 //> a  : ch7.par.Par[Int] = <function1>
26 val b = async(2+1)                                //> b  : ch7.par.Par[Int] = <function1>
27 val es = Executors.newCachedThreadPool()          //> es  : java.util.concurrent.ExecutorService = java.util.concurrent.ThreadPool
28                                                   //| Executor@71be98f5[Running, pool size = 0, active threads = 0, queued tasks =
29                                                   //|  0, completed tasks = 0]
30 run(es)(b).get                                    //> res0: Int = 3
31 run(es)(a).get                                    //> res1: Int = 11
32 es.shutdown()
33 
34 }

From the application example we can understand that thread management is provided by the existing java tool (Executors.newCachedThreadPool), we do not need to understand the details of thread management. We also determined that the thread management mechanism is loosely coupled to the parallel computing Par we designed.

Note: unit does not use ExecutorService es, but instead directly returns a Future that states that the operation is complete (isDone=true), and the get of this Future is the incoming parameter a of unit. If we then use this Future's get to fetch the result of the expression's operation, the operation is run in the current main thread. async selects a new thread by forking; and submits the arithmetic task to the new runtime environment. Let's analyze the arithmetic flow.

1. val a = unit(4+7), unit constructs a completed new Future; isDone=true, sets Future.get = 4 + 7, and run(es)(a) operates on the expression 4 + 7 in the main thread and takes the result 11.

2、val b = async(2+1) >>> fork(unit(2+1)), run(es)(b) >>> submit(new Callable), note def call = run(es)(b).get : The operations submitted hererun(es)(b).get Actually commits another operation and locks the thread directly(blocking) Waiting to read the result of the operation。 First submissionCallable Again, you need to lock the thread to wait for the commit operation to complete the calculation。 If the thread pool can only provide one thread, First submission.Callable will occupy this unique thread and wait for the result of the second commit operation, Since there are no threads available for secondary commit operations, This operation will never yield a result, or sorun(es)(b).get It's gonna be a deadlock.(dead lock)。

We present in this section a simple generic parallel component library design that can put an operation into a new thread of computation other than the main thread. But the result of the extraction operation will still lock the thread (blocking). We will discuss in the next section how to implement parallel operations with some algorithmic functions.


Recommended>>
1、spring configuration file details
2、CCTV Top 10 points to watch at the 2015 sessions which are relevant to us
3、python Notes 12Python Multithreaded Events Event
4、Want to know how switch determines conditions
5、EventBus 30 Usage and Source Code Analysis

    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送

    已发送

    朋友将在看一看看到

    确定
    分享你的想法...
    取消

    分享想法到看一看

    确定
    最多200字,当前共

    发送中

    网络异常,请稍后重试

    微信扫一扫
    关注该公众号