#
Flow Class: Subscriber, Publisher, Subscription interfaces
This tutorial explains to you how to use Flow.Subscriber, Flow.Publisher, Flow.Subscription interfaces in Java. We have an example for you as well. If you want to see how the Flow.Processor interface could be implemented, you can take a look at the article named Flow Class: Processor interface.
Reactive applications are the applications which react to data as they occur. The core principals of the reactive applications (or systems) were defined in Reactive Manifesto and they are:
responsive
: the application respond quickly to input data;resilient
to failureelastic
: could handle easily different workloadsmessage-driven
: the messages must be queued not to overload the consumer.
Info
- Flow
is a class which define Processor, Subscriber, Publisher, Subscription interfaces. These interfaces are considered
nested interfaces
.
Starting from Java 9, an easy way of creating reactive application is by using Flow API.
We can use Processor
, Subscriber
, Publisher
, Subscription
interfaces and SubmissionPublisher
class
(this class implements Flow.Publisher).
Info
A Subscription can be shared by exactly one Publisher and one Subscriber for the purpose of mediating data exchange. The subscription is the way a Publisher (producer) can communicate with a Subscriber (consumer). A Processor is when an object acts as a Publisher and a Subscriber in the same time.
Please take a look at the following example and read carefully the comments. The code is self-explanatory.
This example is created from a simple Spring Boot application created with Spring Initializr. I am using Maven, Java 17, Spring Boot 3.1.0.
From the base application downloaded from Spring Initializr, I updated the main class and I added some new classes as below:
package com.example.demo;
public class Employee {
int id;
String name;
public Employee(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
package com.example.demo;
import java.util.concurrent.Flow;
public class MySubscriber1 implements Flow.Subscriber<Employee> {
private Flow.Subscription subscription;
// Do something when the Subscriber subscribes to a Publisher
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
// Requests 3 messages from the Publisher. Generally we use 1.
subscription.request(3);
}
// Do something when the Subscriber receive a message from the Publisher
@Override
public void onNext(Employee emp) {
System.out.println("//MySubscriber1// is processing \"" + emp.getName()+"\" data.");
// After the 1st event, the subscription is cancelled and
// MySubscriber1 will no longer process the messages published by the publisher
subscription.cancel();
}
// Do something when an error occurs
@Override
public void onError(Throwable error) {
System.out.println("Error occurred: " + error.getMessage());
}
// Do something when the Publisher stop publishing
@Override
public void onComplete() {
System.out.println("onComplete() from MySubscriber1");
}
}
package com.example.demo;
import java.util.concurrent.Flow;
public class MySubscriber2 implements Flow.Subscriber<Employee> {
private Flow.Subscription subscription;
// Do something when the Subscriber subscribes to a Publisher
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
// Requests 1 messages from the Publisher. This request(1) "starts" the Subscriber.
subscription.request(1);
}
// Do something when the Subscriber receive a message from the Publisher
@Override
public void onNext(Employee emp) {
System.out.println(emp.getName()+" data is processed by >>> MySubscriber2.");
// Requests another 1 messages from the Publisher if any available
subscription.request(1);
}
// Do something when an error occurs
@Override
public void onError(Throwable error) {
System.out.println("Error occurred: " + error.getMessage());
}
// Do something when the Publisher stop publishing
@Override
public void onComplete() {
System.out.println("onComplete() from MySubscriber2");
}
}
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.concurrent.SubmissionPublisher;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(DemoApplication.class, args);
// Define a publisher which can "emit" Employee classes
SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();
publisher.subscribe(new MySubscriber1());
publisher.subscribe(new MySubscriber2());
// Publish a list of employees to MySubscriber1 & MySubscriber2
for (int i = 1; i <= 10; i++) {
publisher.submit(new Employee(i, "EMP"+i));
}
System.out.println("HELLO from DemoApplication");
Thread.sleep(5000);
publisher.close();
Thread.sleep(1000);
}
}
When I run this code I get the following log:
HELLO from DemoApplication
EMP1 data is processed by >>> MySubscriber2.
EMP2 data is processed by >>> MySubscriber2.
EMP3 data is processed by >>> MySubscriber2.
EMP4 data is processed by >>> MySubscriber2.
EMP5 data is processed by >>> MySubscriber2.
EMP6 data is processed by >>> MySubscriber2.
EMP7 data is processed by >>> MySubscriber2.
//MySubscriber1// is processing "EMP1" data.
EMP8 data is processed by >>> MySubscriber2.
EMP9 data is processed by >>> MySubscriber2.
EMP10 data is processed by >>> MySubscriber2.
onComplete() from MySubscriber2
Process finished with exit code 0
Info
- If you cancel the subscription, the "onComplete()" event is not received.
- The subscribers process the data in ASYNC mode.
Info
Mono and Flux are both implementations of the Publisher interface.