Introduction to RxJava (Part I/III – Introduction)

In these posts we are going to explore the basic things about RxJava:

Introduction

So, what is ReactiveX? According to the page  “ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.” In our  case we are going to use the Java flavor of the library (In the ReactiveX’s page there is a good brief about the library I recommend to take 5 min to read it).

The definition is very short but implies multiple concepts: Asynchronous, Event-based programs, and Observables.

  • Asynchronous: Unlike the synchronous task, the asynchronous task does not block the process.
  • Event-based programs: The design of the programs is based on components emitting and reacting over  different events.
  • Observable: It’s based on the Observer design pattern, where we have the Subject (Observable) that notify the different observers that are subscribed.

These are not accurate definitions of each concept (each concept require they own post), but help us for the aim of the post.

Basically we are going to have a component that emits events (Observable) and a component that is going to react over each event all this in an asynchronous way. 

We can start with a simple example.

First we need to add the dependency in order to use the library:

Pom.xml:

<dependency>
   <groupId>io.reactivex.rxjava2</groupId>
   <artifactId>rxjava</artifactId>
   <version>2.2.14</version>
</dependency>

Then we create a simple Observer:

@Slf4j
public class SimpleObserver implements Observer<String> {
   @Override
   public void onSubscribe(Disposable disposable) {
       log.info("Subscribing");
   }

   @Override
   public void onNext(String s) {
       log.info("Event: {}", s);
   }

   @Override
   public void onError(Throwable throwable) {
       log.error("Error: ", throwable);
   }

   @Override
   public void onComplete() {
       log.info("There are not events");
   }
}

Note: these examples uses lombok

We can appreciate that we are implementing Observer interface, that is the contract that we need to follow in order to observe an Observable. For more information http://reactivex.io/documentation/contract.html


Now we are going to create the Observable who emits the events to each Observer. There are different ways to create Observable. In our case we are going to use “just”.

Observable<String> eventEmitter = Observable.just("Event 1", "Event 
2", "Event 3", "Event 4");

All together:

@Slf4j
public class BaseMain {

   public static void main(String[] args) {
       log.info("Starting example");
       Observable<String> eventEmitter = Observable.just("Event 1", "Event 2", "Event 3", "Event 4");
       eventEmitter.subscribe(new SimpleObserver());
       log.info("Finish example");
   }
}

We can see the line

eventEmitter.subscribe(new SimpleObserver());

In this line we are linking the Observable (eventEmitter) with the Observer (SimpleObserver).

If we run the program we can see this output:

Subscribing
Event: Event 1
Event: Event 2
Event: Event 3
Event: Event 4
There are not events

This is an image to show the interaction between the components:

In this example we are using a String as an event but the event could be any type of Object (String, Integer, List, etc). In that case we are going to have an Observable<T> and an Observer<T>.

Also we implement the Observable interface just to show you the different methods of the Observer and when each one is called. However there is another way to perform the subscription and these are using Lambdas. (If you are not familiar with lambda I recommend you check the documentation before to continue)

The example above with lambda is:

@Slf4j
public class BaseMain {

   public static void main(String[] args) {
       log.info("Starting example");
       Observable<String> eventEmitter = Observable.just("Event 1", "Event 2", "Event 3", "Event 4");
       eventEmitter.subscribe((event)-> log.info("Using Lambda - Event: {}", event));
       log.info("Finish example");
   }
}

In this case we are subscribing to the observable with a lambda function

eventEmitter.subscribe((event)-> log.info("Using Lambda - Event: {}", event));

The result of the example is:

Using Lambda - Event: Event 1
Using Lambda - Event: Event 2
Using Lambda - Event: Event 3
Using Lambda - Event: Event 4

As you see we haven’t the onSubscribe and onComplete output. That is because we did not define any behavior for them. If we like to have the same behavior using the lambda the example should be this:

@Slf4j
public class BaseMain {

   public static void main(String[] args) {
       log.info("Starting example");
       Observable<String> eventEmitter = Observable.just("Event 1", "Event 2", "Event 3", "Event 4");
       eventEmitter.subscribe(
               (event)-> log.info("Using Lambda - Event: {}", event),
               (throwable) -> log.error("Error", throwable),
               () -> log.info("There are not events"),
               (disposable) -> log.info("Subscribing")
       );
       log.info("Finish example");
   }
}

In the next examples we are going to use lambda instead of to use a class that implements Observable interface.
Other thing that we are going to add is the “marble diagram”

This is a way to represent the different events generated by an Observable through time

Next Post: Introduction to RxJava (Part II/III – Operators)

One Reply to “Introduction to RxJava (Part I/III – Introduction)”

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.