avatar

PCDP in Java

Parallel Programming in Java

Task Parallelism

1.1 Task Creation and Termination (Async, Finish)

1
2
3
4
5
6
S1;
finish {
async S2; // asynchronously compute sum of the lower half of the array
S3; // in parallel with S2
}
S4;

1.2 Creating Tasks in Java’s Fork/Join Framework

1
2
3
4
5
S1;
Fork S2;
S3;
Join S2;
S4;

1.3 Amdahl’s Law

speedup = 1 / ( 1-p + p/s )


Functional Parallelism

2.1 Futures: Tasks with Return Values

1
2
A = future { ⟨ task-with-return-value ⟩ }  // assigned a reference to a future object returned by a task of the form
A.get() // the operation waits until the task has completed, and then propagates the task’s return value as the value returned by A.get()

2.2 Futures in Java’s Fork/Join Framework

  1. A Future task extends the RecursiveTask class in the Fork/Join framework, instead of RecursiveAction as in regular tasks.
  2. The compute() method of a future task must have a non-void return type, whereas it has a void return type for regular tasks.
  3. A method call like left.join() waits for the task referred to by object left in both cases, but also provides the task’s return value in the case of future tasks.

2.3 Memoization

In dynamic programming, the memoization pattern stores { (variables), (future task) } as a lookup table. If the future task exists, look it up and get the return value by get().

2.4 Java Streams

1
2
3
4
5
students.stream()
.filter(s -> s.getStatus() == Student.ACTIVE)
.mapToDouble(a -> a.getAge())
.average()
.getAsDouble();
  • The pipeline can be made to execute in parallel by designating the source to be a parallel stream, i.e., by simply replacing students.stream() in the above code by students.parallelStream() or Stream.of(students).parallel().

2.5 Data Races and Determinism

  • Determinism:
    • functional : same input -> same output
    • structural : same input -> same competition graph
  • Data Races
    • Read-write
    • Write-write

Loop Parallelism

3.1 Parallel Loops

1
2
3
4
5
finish{
for(p=head;p!=null;p=p.next){
async{ compute(p) }
}
}

e.g

1
2
3
for(int i=0;i<n;i++){
A[i] = B[i] + C[i];
}

to

1
2
IntStream.range(0, n).paralle().forEach(i -> A[i] = B[i] + C[i]);
A = IntStream.range(0, n).paralle().toArray(i -> B[i] + C[i]);

3.2 Parallel Matrix Multiplication

1
2
3
4
5
6
7
for(i : [0:n-1]) {
for(j : [0:n-1]) { c[i][j] = 0;
for(k : [0:n-1]) {
c[i][j] = c[i][j] + a[i][k]*b[k][j]
}
}
}

It is safe to convert for-i and for-j into forall loops, but for-k must remain a sequential loop to avoid data race.

3.3 Barriers in Parallel Loops

1
2
3
4
5
6
forall (i : [0:n-1]) {
myId = lookup(i); // convert int to a string
print HELLO, myId;
<barrier>
print BYE, myId;
}

3.4 Iteration Grouping/Chunking in Parallel Loops

Use grouping to reduce task number.


Data flow Synchronization and Pipelining

4.1 Split-phase Barriers with Java Phasers

1
2
3
4
5
6
7
8
9
10
11
Phaser ph = new Phaser(n);
// Create forall loop with n iterations that operate on ph
forall (i : [0:n-1]) {
print HELLO, i;
int phase = ph.arrive();

myId = lookup(i); // convert int to a string

ph.awaitAdvance(phase);
print BYE, myId;
}

Doing so enables the barrier processing to occur in parallel with the call to lookup(i), which was our desired outcome.

4.2 Point-to-Point Sychronization with Phasers

Task 1 Task 2 Task 3
X=A() //cost=1 Y=B() //cost=2 Z=C() //cost=3
ph0.arrive() ph1.arrive() ph2.arrive()
ph1.wait() ph0.wait(), ph2.wait() ph1.wait()
D(X,Y) //cost=3 E(X,Y,Z) //cost=2 F(Y,Z) //cost=1

Why use Phaser? Because we can split ph.arriveAndAwaitAdvance() into arrive and await

4.3 One-Dimensional Iterative Averaging with Phasers

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Allocate array of phasers
Phaser[] ph = new Phaser[n+2]; //array of phasers
for (int i = 0; i < ph.length; i++) ph[i] = new Phaser(1);

// Main computation
forall ( i: [1:n-1]) {
for (iter: [0:nsteps-1]) {
newX[i] = (oldX[i-1] + oldX[i+1]) / 2;
ph[i].arrive();

if (index > 1) ph[i-1].awaitAdvance(iter);
if (index < n-1) ph[i + 1].awaitAdvance(iter);
swap pointers newX and oldX;
}
}

4.4 Pipeline Parallelism

1
2
3
4
5
6
7
8
9
while ( there is an input to be processed ) {
// wait for previous stage, if any
if (i > 0) ph[i - 1].awaitAdvance();

process input;

// signal next stage
ph[i].arrive();
}

4.5 Data Flow Parallelism

A -> C;
A, B -> D;
B -> E

1
2
3
4
5
async( () -> {/* Task A */; } ); // Complete task and trigger event A
async( () -> {/* Task B */; } ); // Complete task and trigger event B
asyncAwait(A, () -> {/* Task C */} ); // Only execute task after event A is triggered
asyncAwait(A, B, () -> {/* Task D */} ); // Only execute task after events A, B are triggered
asyncAwait(B, () -> {/* Task E */} ); // Only execute task after event B is triggered

The ForkJoinPool

  • Use cases: good choice for computation-heavy, divide-and-conquer problems
  • ForkJoinTask is abstract: can be implemented by RecurisiveAction or RecurisiveTask
  • Usages:
    execute async execution, with no return value
    submit async execution, with a return value that can be retrieved by task.get()
    invoke async execution, and the returned value will be obtained in the caller threads

Concurrent Programming in Java

Threads and Locks

1.1 Threads

Create a thread Thread t = new Thread();
Start executing t.start();
Join and wait t.join();
  • Threads provides a low-level function for parallelism
  • Since there is no restriction on which thread can perform a join on which other thread, it is possible for a programmer to erroneously create a deadlock cycle with join operations.

1.2 Structured Locks

  1. Structured lock construct is responsible for both acquiring and releasing the lock that it works with..
  2. Structured locks can be used to enforce mutual exclusion and avoid data races;
  3. Structured locks are also referred to as intrinsic locks or monitors.
    1
    2
    3
    4
    5
    inc(){
    synchronized(A){ // Acquire A
    A.count += 1;
    } // Release A
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    X {buf, in, out} // A buffer with in and out pointers.
    insert(item){
    synchronized(X){
    wait if the buf is full
    buf[in++] = item
    in++
    notify remove
    }
    }
    remove(){
    synchronized(X){
    wait if the buf is empty
    item = buf[out];
    out++;
    return item;
    notify insert
    }
    }

1.3 Unstructured Locks

lock and unlock is decided on the programmer’s side. E.g. N1(L1) -> N2(L2) -> N3(L3) -> N4

1
2
3
4
5
6
7
8
9
lock(L1)
lock(L2)
work w/N1, N2
unlock(L1)
lock(L3)
work w/N2, N3
unlock(L2)
work w/N3, N4
unlock(L3)
  1. Unstructured lock supports “hand-over-hand” locking.
  2. tryLock() will add more flexibility. If the lock cannot be acquired, the algorithm can do some other thing.

Read-write lock:
Multiple threads are permitted to acquire a read lock, but only one thread is permitted to acquire a write lock.

1
2
3
4
5
6
7
8
9
10
11
// Let's say there is an array. 
search(x){
readLock()
find array[i]==x
unlock()
}
update(i, y){
writeLock()
array[i]=y
unlock()
}

1.4 Liveness

  1. deadlock: two threads block each other
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    T1{
    sync(A){
    sync(B){
    ...
    }
    }
    }
    T2{
    sync(B){
    sync(A){
    ...
    }
    }
    }
  2. live lock: continue executon but no progress
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    mutable int x;
    T1{
    do{
    sync(x){
    x.incr();
    r = x.get();
    }
    }while(x<2)
    }
    T2{
    do{
    sync(x){
    x.decr();
    r = x.get();
    }
    }while(x>-2)
    }
  3. starvation

Liveness refers to a progress guarantee, i.e, deadlock freedom, livelock freedom, and starvation freedom.

1.5 Dining Philosophers

Some philosophers sit at a round table, and they will repeatly think, pick up chopsticks, eat, put down chopsticks.

  1. with structured lock

    1
    2
    3
    4
    5
    6
    7
    8
    while(true){
    think
    sync(left){
    sync(right){
    eat
    }
    }
    }

    This implementaion will cause deadlock

  2. with structured lock

    1
    2
    3
    4
    5
    6
    7
    8
    while(true){
    think
    s1 = tryLock(left)
    if(!s1) continue
    s2 = tryLock(left)
    if(!s2) continue
    eat
    }

    There is no deadlock because we are using tryLock. But livelock is possible.

  3. Solution:

    1. Modify and let at least one philosopher, E, pick up right first then left
    2. The philosophers that sit on odd seats pick up left first, while the ones that sit on even seats pick up right first.
    3. Only pick up both chopsticks when both are available

Critical Sections and Isolation

2.1 Critical Sections

Use isolate to denote critical sections

1
2
3
4
5
6
7
8
9
10
11
12
13
//I transfer money via the family account to daughter
T1{
isolated{
myBalance -= 100;
familyBalance += 100;
}
}
T2{
isolated{
familyBalance -= 100;
daughterBalance += 100;
}
}

2.2 Object Based Isolation (Monitors)

A linked list

1
2
3
4
5
6
delete(cur){
isolated(cur, cur.prev, cur.next){
cur.prev.next = cur.next;
cur.next.prev = cur.prev;
}
}

The relationship between object-based isolation and monitors is that all methods in a monitor object, M1, are executed as object-based isolated constructs with a singleton object set, {M1}. Similarly, all methods in a monitor object, M2, are executed as object-based isolated constructs with a singleton object set, {M2} which has an empty intersection with {M1}.

2.3 Concurrent Spanning Tree Algorithm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
compute(v){ //parallelize the spanning tree computation
for each neighbor c of v{
s <- makeParent(v, c)
if(s){
compute(c)
}
}
}
makeParent(v, c){
isolated(c){ // call MakeParent(v1, c) and MakeParent(v2, c) on the same vertex c at the same time
if(c.parent==null){
c.parent = v
return true;
}else{
return false;
}
}
}

2.4 Atomic Variables

A shared value like cur performs cur = cur + 1. It can be replaced by an atomic integer:

1
2
AtomicInteger cur = new AtomicInteger();
cur.getAndAdd(1); // more efficiently

Atomic reference uses compareAndSet

1
2
AtomicReference ref = new AtomicReference();
ref.compareAndSet(expected, new);
1
2
3
4
5
6
7
8
9
10
compareAndSet(expected, new){
isolated(this){
if(this.value == expected){
this.value = new
return true
}else{
return false
}
}
}

2.5 Read, Write Isolation

The main idea behind read-write isolation is to separate read accesses to shared objects from write accesses.


Actors

3.1 Actors

An Actor uses object-based isolation. It forces all accesses to an object to be isolated by default.

Actor
Mailbox
Methods
Local State

The Actor model is reactive. It means actors can only execute methods in response to messages; these methods can read/write local state and/or send messages to other actors.

Thus, the only way to modify an object in a pure actor model is to send messages to the actor that owns that object as part of its local state. In general, messages sent to actors from different actors can be arbitrarily reordered in the system. However, in many actor models, messages sent between the same pair of actors preserve the order in which they are sent

3.2 Actor Examples

A pipeline checks if the word consists of all lower-case characters, and then checks if the word is even length.

1
2
3
4
5
6
7
8
9
10
11
process(s){ // y: checks if the word consists of all lower-case characters
if(s is lower case){
y.send(s)
}
}

process(s){ // z: checks if the word is even length
if(s is even length){
z.send(s)
}
}

3.3 Sieve of Eratosthenes Algorithm

To implement the Sieve of Eratosthenes, we implement a actor pipeline:

Non-Mul-2 -> Non-Mul-3 -> Non-Mul-5 -> Non-Mul-7 -> …

A Java code sketch for the process() method for an actor responsible for filtering out multiples of the actor’s “local prime” in the Sieve of Eratosthenes is as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void process(final Object msg) {
int candidate = (Integer) msg;
// Check if the candidate is a non-multiple of the "local prime".
// For example, localPrime = 2 in the Non-Mul-2 actor
boolean nonMul = ((candidate % localPrime) != 0);
// nothing needs to be done if nonMul = false
if (nonMul) {
if (nextActor == null) {
. . . // create & start new actor with candidate as its local prime
}
else nextActor.send(msg); // forward message to next actor
}
} // process

3.4 Producer-Consumer Problem

1
2
3
4
comsumer-thread(){
while(buffer is empty) {} // wait
process the item removed from buffer
}

If solve this problem by using locks with wait-notify operations or by using object-based isolation, both approaches will require low-level concurrent programming techniques to ensure correctness and maximum performance.

However, we can do it using Actor

1
2
3
4
5
6
7
comsumer-actor(){
process(s){
process the item removed from buffer
tell the buffer that it is ready
}

}

3.5 Bounded Buffer Producer-Consumer Problem

bounded-buffer


Concurrent Data Structures

4.1 Optimistic Concurrency

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Atomic Integer{
get();
set();
compareAndSet();
getAndAdd(delta){
while(true){
curr = this.get();
next = curr + delta;
this.set(next);
if(this.compareAndSet(curr, next)){
return curr;
}
}
}
}
  1. no deadlock because there is no blocking
  2. no livelock because one eventually makes progress

4.2 Concurrent Queue

1
2
3
4
5
6
7
8
9
10
11
12
Queue{
enq(x){
tail.next = x;
tail = x;
}
deq(){
if(empty) throw Exception;
r = head;
head = head.next;
return r;
}
}

Make queue to be more friendly in multi-threaded situations.

1
2
3
4
5
6
7
8
9
10
11
12
Queue{
enq(x){
tail.next.compareAndSet()
tail = x;
}
deq(){
if(empty) throw Exception;
r = head;
head = head.next;
return r;
}
}

4.3 Linearizability

  1. When the value is written, all reads will return the modified value
  2. Read will return the value that is got between request and response.

4.4 Concurrent Hash Map

1
2
3
4
5
6
7
ConcurrentHashMap{
get(key); // linearizable
put(key, value); // linearizable
putIfAbsent(key, value); // linearizable
clear(); // not linearizable
putAll(); // not linearizable
}

Distributed Programming in Java

Distributed Map Reduce

1.1 Introduction to Map-Reduce

  1. Map:
    (KA,VA) -> (KA1, VA1), (KA2, VA2);
    (KB,VB) -> (KB1, VB1), (KB2, VB2)
  2. Group the same intermediate key: if KA1=KB1=K, then we have (K, (VA1,VB1))
  3. Reduce: G is a user-specified funciton, (K, G(VA1,VB1))

1.2 Hadoop Framework

Apache Hadoop implements distributed Map-Reduce.
A distributed computer can be viewed as a large set of multicore computers connected by a network, such that each computer has multiple processor cores. Each computer has some storage.

Hadoop allows the programmer to specify map and reduce functions in Java, and takes care of all the details of generating a large number of map tasks and reduce tasks to perform the computation as well as scheduling them across a distributed computer.
Hadoop is fault tolerant in that if a node fails, the task can be re-executed with the same input elsewhere.

Hive and Pig (query language) can perform word-count.
The input K-V pairs distribute among computers,
map them to intermediate key-value pairs,
group by intermediate key,
and sreduce.

1.3 Spark Framework

Hadoop Spark
The computers in the distributed computer use memory just to stream data from disks, perform map or reduce operations, stream the data back to the disk The computers in the distributed computer use memory as data buffer and perform computation on them
K-V pairs resilient distributed datasets
Map-reduce More operations: transforms (intermediate operations), like map, filter, join…; actions (terminal operations), like reduce, collect…
  • Spark has two innovations:

    1. lazy evaluation: don’t need to schedule intermediate operations until you know what the terminal operations
    2. caching in memory: read and write data from memory -> faster
  • Word-count example:

    1. sc = new JavaSpackContext();
    2. input = sc.textFile(inputPath);
    3. words = input.flatMap(line -> line.split(“ “));
    4. pairs = words.mapToPair(word -> new Tuple2<String, Integer>(word, 1));
    5. counts = pairs.reduceByKey((a,b)->a+b);

1.4 TF-IDF Example

Term Frequency – Inverse Document Frequency: to identify documents that are most similar to each other within a large corpus.
| | D1 | D2 | … | DN | DF |
| - | - | - | - | - | - |
| Term1 | TF1,1 | TF1,2 | | TF1,N | DF1 |
| Term2 | TF2,1 | TF2,2 | | TF2,N | DF2 |

  • TFi,j - term frequency: Termi frequency in Dj
  • DFi - document frequency: the count of Termi in documents
  • IDF - inverse document frequency: N/DF
  • weight of Termi in Dj = TFi,j * log( N/DFi )

Calculate using Hadoop:

  • input: (document, {terms})
  • map: (document, {terms}) -> ((document, term), 1)
  • reduce: ((document, term), TF)
  • map: ((document, term), TF) -> (term, 1)
  • reduce: (term, DF)

1.5 Page Rank Example

page-rank

1
2
3
4
5
6
7
8
for (iter=...) {
1) for each page A -> B{
contribs(B) += rank(A)/dest_count(A)
}
2) for each page B{
rank(B) = 0.15 + 0.85 * contribs(B)
}
}

Calculate using Spark:

1
2
3
4
contribs = links.join(ranks).values()
.flatMapToPair(dest -> (dest, rank(src)/destcount);
ranks = contribs.reduceByKey(new Sum())
.mapValues(sum -> 0.15 + sum * 0.85);

Client-server Programming

2.1 Introduction to Sockets

Client-server programming is typically used for building distributed applications with small numbers of processes.

1
2
3
4
5
6
7
8
9
// Get communications on JVM A and JVM B:s
bSocket = new ServerSocket(port);
a = bSocket.accpet(); // a refers to the client socket from JVM A
a.getInputStream(); // read
a.getOutputStream(); // write

aSocket = new Socket(port);
aSocket.getInputStream(); // read
aSocket.getOutputStream(); // write

Sockets logically connect two JVMs, but what actually happens physically is that the message has to go through the network from the client to the server and the server to the client.

2.2 Serialization/Deserialization

Serialization: objects to bytes

Solutions:

  1. Custom serialization/deserialization
  2. XML
  3. Java serialization/deserialization
    1
    class X implements Serializable
    Transient: annotate fields should not be copied
  4. IDL(interface definition language)
    • instance: protocal buffer
    • pros: communicate not just in Java, but in other languages like Python
    • cons: programmers have to define the interface in .proto

2.3 Remote Method Invocation

Thread on JVM A wants to call method of object x on JVM B, and get a return value y.
remote-method-invocation
Both x and y should be serialiable. Object x must be included in the RMI registry, so that it can be accessed through a global name rather than a local object reference.

In summary, a key advantage of RMI is that, once this setup in place, method invocations across distributed processes can be implemented almost as simply as standard method invocations.

2.4 Multicast Sockets

Unicast Broadcast Multicast
src sends message to dest through routers src sends message to a router, and to all dests that are connected to the router in the local network src sends message to multiple dests through cloud
single destination, point-to-point communication multiple destination, in local network multiple destination, through cloud/Internet

In Java

1
2
3
4
5
6
7
s = new MulticastSocket(port);
s.joinGroup(g);
message = new DatagramPacket(); // create a message
s.send(message); // send it to the group
message2 = new DatagramPacket(); // a buffer for receiving the message
s.receive(message2);
s.leaveGroup(g);

2.5 Publish-Subscribe Model

We have some producers and consumers. Producers send messages to topics, while consumers read messages from topics. Topics work as a broker, which producers publish messages to and consumers poll messages from.

1
2
3
4
5
6
7
8
9
10
11
12
//producer
p = new KafkaProducer();
p.send(new ProducerRecord(topic, key, value));
p.close();

//comsumer
c = new KafkaConsumer();
c.subsribe(topic);
c.poll(timeout);
for(each r : records){
r.offset, r.key, r.value
}

Message Passing

3.1 Single Program Multiple Data (SPMD) model

Different nodes work on different local elements of a global variable. The interface available is MPI (Message passing interface)

1
2
3
4
5
6
7
// global view: XG, local view: XL
main(){
MPI.init()
for(i : ...){
XL[i] = XL.length * rank() + i
}
}

3.2 Point-to-Point Communication

1
2
// In rank0, s="ABCD" and send s to rank1
// In rank1, allocate buffer, and receive into buffer from rank 0, then print buffer
1
2
3
4
5
6
7
8
9
10
11
main(){
MPI.init()
if(rank==0){
s="ABCD"
MPI.send(s, 0, s.length, // message, start index, end index
MPI.CHAR, 1, 99) // type of message, receiver rank, message tag (differentiate messages from the same sender and the same receiver)
} else {
buf = new char[4]
MPI.recv(buf, 0, buf.length, MPI.CHAR, 0, 99)
}
}

If sending objects, the object has to implement Serializable

3.3 Message Ordering and Deadlock

R0 R1 R2 R3
R0 wants to receive message from R1
R1 wants to send message to R0 R2 wants to send message to R3
R3 wants to receive message from R2

The same order is only guaranteed if the sender, the receiver, the data type and the tag is the same for two messages. In this case, the order of R0 receiving or R3 receiving message is not guaranteed.

R0 R1
send x to R1 send y to R0
receive y from R1 receive x from R0

Send and receive in MPI are blocking operations. In this case, send x to R1 will wait until a receive is detected.

  • Solution 1:

    R0 R1
    send x to R1 receive x from R0
    receive y from R1 send y to R0
  • Solution 2: the API will avoid deadlock by single send or receive

    R0 R1
    sendrecv(x) sendrecv(y)

3.4 Non-Blocking Communications

Benefits: avoid idle time between blocking send and recv

R0 R1
S1: req0 = isend S2: req1 = irecv
S3 S4
wait(req0) wait(req1)
S5 S6

Other operations: waitAll and waitAny

3.5 Collective Communication

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
main(){
MPI.init()
int x[] = ...
if(rank==0) x[0] = 10

// R0 send x[0] to all the other nodes
bcast(x, 0, 1, // data, start index, end index
MPI.INT, 0) // data type, root
print rank, x[0]

// R2 computes the sum of y and store the result in z
reduce(y, 0, // input, offset,
z, 0, // dest, offset,
1, // the size of input and output
MPI.INT, 2) // data type, root
if(rank==2){
print z[0]
}
}

Combining Distribution And Multi-threading

4.1 Processes and Threads

Multiple threads in a process Multiple processes in a node
memory or resource efficiency, due to sharing responsiveness to, for example, JVM delays
responsiveness to network delays scalability (improve throughput)
performance availability (also referred to as resilience)

4.2 Multithreaded Servers

1
2
3
4
5
6
7
8
9
10
11
listener = new ServerSocket(…);
while(true){
s = listener.accept(…); // A
t = new Thread( () -> { // the next request
can start immediately thereafter
read file request from s.getInputStream; // B
access the file; // C
send file to s.getOutputStream; // D
});
t.start();
}

There is an overhead when creating a new thread each time. It can be solved by using thread pool.

4.3 MPI and Threading

1
2
3
4
5
6
7
8
// master thread
// threads communicate without network
MPI.INIT()
create worker threads // these threads can work in parallel
MPI.SEND()
MPI.RECV()
MPI.REDUCE()
MPI.FINALIZE()
  • threading modes:
    funneled all MPI calls are performed by one thread
    serialized at most have one MPI call at a time
    multiple multiple called at the same time

4.4 Distributed Actors

If we have large numbers of actors, it will be too much to run them all on a single node.

Solution: put future actors on a new different node.

  1. Create a configuration file. That tells you for each actor, what the host and port is for that actor to receive messages remotely
  2. Create remote actors
  3. Look up remote actors by some kind of logical name
  4. Messages need to be serialized.

4.5 Distributed Reactive Programming

It’s best to implement distributed service oriented architectures using asynchronous events. A key idea behind this model is to balance the “push“ and “pull“ very efficiently.

Reactive streams specification:

1
2
3
4
Flow.Publisher
Flow.Subscriber
Flow.Process
Flow.Subscription
1
2
3
4
5
// publisher
pub = new SubmissionPublisher();
pub.subscribe(s1);
pub.subscribe(s2);
pub.submit(...);
1
2
3
4
5
6
7
8
9
10
// subscriber
// each subscriber has to implement onSubscribe and onNext
// In this case, only 100 events will be sent at a time onNext will be called for each item.
onSubscribe(sn) {
sn.req(100);
}
onNext(item){
process item
if(...) sn.req(100); // in some condition when you're ready for the next batch
}
Author: hyangjudy
Link: https://hyangjudy.github.io/2020/04/23/PCDPInJava/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.
Donate
  • 微信
    微信
  • 支付寶
    支付寶