#
Flow Class: Processor interface
This tutorial explains to you how to use Flow.Processor interface in Java. We have an example for you as well.
Reactive applications are the applications which react to data as they occur. The core principals of the reactive applications (or systems) were defined in Reactive Manifesto and they are:
responsive
: the application respond quickly to input data;resilient
to failureelastic
: could handle easily different workloadsmessage-driven
: the messages must be queued not to overload the consumer.
Info
- Flow
is a class which define Processor, Subscriber, Publisher, Subscription interfaces. These interfaces are considered
nested interface
.
Starting from Java 9, an easy way of creating reactive application is by using Flow API.
We can use Processor
, Subscriber
, Publisher
, Subscription
interfaces and SubmissionPublisher
class
(this class implements Flow.Publisher).
Info
A Subscription can be shared by exactly one Publisher and one Subscriber for the purpose of mediating data exchange. The subscription is the way a Publisher (producer) can communicate with a Subscriber (consumer). A Processor is when an object acts as a Publisher and a Subscriber in the same time.
Please take a look at the following example and read carefully the comments. The code is self-explanatory.
This example is created from a simple Spring Boot application created with Spring Initializr. I am using Maven, Java 17, Spring Boot 3.1.0.
From the base application downloaded from Spring Initializr, I updated the main class and I added some new classes as below:
package com.example.demo;
public class Employee {
int id;
String name;
public Employee(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
package com.example.demo;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class MyProcessor extends SubmissionPublisher<Integer> implements Flow.Processor<Employee, Integer> {
private Flow.Subscription subscription;
// Do something when the Processor subscribes to a Publisher
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
// Requests 1 messages from the Publisher. This request(1) "starts" the Subscriber(Processor).
subscription.request(1);
}
// Do something when the Processor receive a message from the Publisher
@Override
public void onNext(Employee emp) {
var empId = emp.getId();
System.out.println(emp.getName()+" ... data is processed by PROCESSOR >> empId="+empId);
// Emit an event (which will be handled by MySubscriber)
submit(empId);
// Requests another 1 messages from the Publisher if there is any available
subscription.request(1);
}
// Do something when an error occurs
@Override
public void onError(Throwable error) {
System.out.println("Error occurred: " + error.getMessage());
}
// Do something when the Publisher stop publishing
@Override
public void onComplete() {
System.out.println("onComplete() from Processor");
}
}
package com.example.demo;
import java.util.concurrent.Flow;
public class MySubscriber implements Flow.Subscriber<Integer> {
private Flow.Subscription subscription;
// Do something when the Subscriber subscribes to a Publisher
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
// Requests 1 messages from the Publisher(Processor).
subscription.request(1);
}
// Do something when the Subscriber receive a message from the Publisher(Processor)
@Override
public void onNext(Integer nrReceived) {
System.out.println("The result on the SUBSCRIBER for empId=" + nrReceived+" is "+nrReceived*10);
// Requests 1 messages from the Publisher(Processor).
subscription.request(1);
}
// Do something when an error occurs
@Override
public void onError(Throwable error) {
System.out.println("Error occurred: " + error.getMessage());
}
// Do something when the Processor stop publishing
@Override
public void onComplete() {
System.out.println("onComplete() from MySubscriber");
}
}
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.concurrent.SubmissionPublisher;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(DemoApplication.class, args);
// Define a publisher which can "emit" Employee instances
SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();
// Create an instance of MyProcessor class
MyProcessor processor = new MyProcessor();
// Create an instance of MySubscriber class
MySubscriber subscriber = new MySubscriber();
// Create the processing chain
publisher.subscribe(processor);
processor.subscribe(subscriber);
// Publish a list of employees to MyProcessor instance (in this case)
for (int i = 1; i <= 5; i++) {
publisher.submit(new Employee(i, "EMP"+i));
}
System.out.println("HELLO from DemoApplication");
Thread.sleep(5000);
publisher.close();
Thread.sleep(1000);
}
}
When I run this code I get the following log:
HELLO from DemoApplication
EMP1 ... data is processed by PROCESSOR >> empId=1
EMP2 ... data is processed by PROCESSOR >> empId=2
EMP3 ... data is processed by PROCESSOR >> empId=3
EMP4 ... data is processed by PROCESSOR >> empId=4
The result on the SUBSCRIBER for empId=1 is 10
EMP5 ... data is processed by PROCESSOR >> empId=5
The result on the SUBSCRIBER for empId=2 is 20
The result on the SUBSCRIBER for empId=3 is 30
The result on the SUBSCRIBER for empId=4 is 40
The result on the SUBSCRIBER for empId=5 is 50
onComplete() from Processor
Process finished with exit code 0
Info
As you can see in the logs, the processing is done in ASYNC mode.