Introduction to RxJava (Part II/III – Operators)
In these posts we are going to explore the basic things about RxJava:
- Introduction to RxJava (Part I/III – Introduction)
- Introduction to RxJava (Part II/III – Operators)
- Introduction to RxJava (Part III/III – Use case & How to test)
Operators
RxJava comes with a lot of operators that help us to combine and compose different observables in different ways with what we could manage different events in an easy way.
We are going to see a few of them but be free to check the documentation to see the rest of them (there are grouped in different categories like transformation, combination, utility, etc).
Transformation
Map:
Transform each event emitted for one observable and emit them in a new observable. You can see that it is very similar to map function in jdk 8 stream.
public static void main(String[] args) {
log.info("Starting example");
Observable<String> eventEmitter = Observable.just("ABC", "D", "EF");
eventEmitter
.map((item) -> item.length())
.subscribe((itemSize) -> log.info("Size of event: {}", itemSize));
log.info("Finish example");
}
In marble diagram:

And the output of the console:
Size of event: 3
Size of event: 1
Size of event: 2
In this case we transform a String to Integer.
If you take a look at the marble diagram you can see two timeline, this is because the operator generates a new observable, so in our example we are subscribing to the second observable.
The previous example could be written in this manner:
public static void main(String[] args) {
log.info("Starting example");
Observable<String> eventEmitter = Observable.just("ABC", "D", "EF");
Observable<Integer> mapEventEmitter = eventEmitter.map((item) -> item.length());
mapEventEmitter.subscribe((itemSize) -> log.info("Size of event {}", itemSize));
log.info("Finish example");
}
As you can see we are subscribing to the mapEventEmitter and not to the origin of the event (eventEmitter).
How does this work? The short explanation is that behind the operator we have an observer (who is subscribing to the eventEmitter) and an Observable that emits the transformed event and where we are observing (It’s not something that you should be aware now but it’s helpful to have in mind when you starting to see subscribeOn or observerOn function).
FlatMap
This is maybe more complex to explain so we are going to start with documentation of reactiveX that say: “transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable”
So let’s put an example to review the definitions:
public static void main(String[] args) {
log.info("Starting example");
Observable.just("A", "B", "C")
.flatMap(letter -> findWords(letter))
.subscribe(word -> log.info("Word: {}", word));
log.info("Finish example");
}
private static Observable<String> findWords(final String letter) {
return DICTIONARY.get(letter);
}
private static final Map<String, Observable<String>> DICTIONARY = new HashMap<>();
static {
DICTIONARY.put("A", Observable.just("air", "apple"));
DICTIONARY.put("B", Observable.just("banana"));
DICTIONARY.put("C", Observable.just("car", "cat", "coach"));
}
We create a new method that gives a letter it’s going to return an Observable that emit words that start with that letter.
According to the documentation: “transform the item emitted by an Observable into Observables…”
private static Observable<String> findWords(final String letter)
So we have the transform function, now we are going to explain “…then flatten the emissions from those into a single Observable”
We can see in the subscription we are expecting a String (word) so the outcome of flatMap is Observable<String> and not Observable<Observable<String>> (as we are going to have if we use map insten of flatmap). What happened? The operator flatten the result of each findWords invocation into a single Observable.
We can have another point of view using the diagrams

In a more descriptive form:

The diagram above it’s not a marble diagram. However, it is useful to explain how the flatmap works.
You can see how the event A is transformed by the method findWords in an Observable that emit the word air, and apple. The same happens with event B that its transformed into an Observable that emits the word banana and something similar with the event C.
At the end everything is flattened in one final observable.
Combining
Zip
Combine multiple observables using a function and emit one event by each combination.
We are going to start with the marble diagram because is easier to explain:

The code:
public static void main(String[] args) {
log.info("Starting example");
Observable<String> letterObservable = Observable.just("A", "B", "C");
Observable<Integer> numberObservable = Observable.just(1, 2, 3);
Observable.zip(letterObservable, numberObservable, (letter, number) -> "Letter " + letter + " combined to " + number)
.subscribe(result -> log.info("Result: {}", result));
log.info("Finish example");
}
The result
Letter A combined to 1
Letter B combined to 2
Letter C combined to 3
We can see that the zip operator receives a list of observable “letterObservable” and “numberObservable” and as last argument receive a function callback where the argument is the event emitted by each observable.
So if we like to combine 4 observable the zip operator should be invoked like this:
Observable.zip(obs1, obs2, obs3, obs4, (eventObs1, eventObs2, eventObs3, eventObs4) -> { /*combine the event to generate a new events*/});
Now check what happen if one of the observable emit different numbers of events:
public static void main(String[] args) {
log.info("Starting example");
Observable<String> letterObservable = Observable.just("A", "B", "C");
Observable<Integer> numberObservable = Observable.just(1, 2);
Observable.zip(letterObservable, numberObservable, (letter, number) -> "Letter " + letter + " combined to " + number)
.subscribe(result -> log.info("Result: {}", result));
log.info("Finish example");
}
The result:
Result: Letter A combined to 1
Result: Letter B combined to 2
As you can appreciate we have not the event C, that is because there is not another event from numberObservable to combine. That is something to take into consideration when we are going to use a zip operator.
Merge
This operator is going to take the output of different observables and create a new observable that emits all the events.
Marble diagrams:

Code:
public static void main(String[] args) {
log.info("Starting example");
Observable<String> letterObservable = Observable.just("A", "B", "C");
Observable<User> userObservable = Observable.just(new User("1", "user 1"), new User("2", "user 2"));
Observable.merge(letterObservable, userObservable)
.subscribe(result -> log.info("Result: {}", result));
log.info("Finish example");
}
class User {
private String id;
private String name;
public User(String id, String name) {
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
'}';
}
}
Output
Result: A
Result: B
Result: C
Result: User{name='user 1'}
Result: User{name='user 2'}
Wait! The output of the execution is different to the marble diagram. Everything is sequential. What is happening? That is because we are using only one thread to execute all the code, that is the default behavior of RxJava if we do not define any Schedulers.
Simple example using operators
At this point you can understand how a marble diagram could be coding (or vice versa). Now everything is fine but…What is the difference between use or not RxJava? We are going to show you one of the advantages with an example:
Imagine we have 5 methods:
User getUser()
TwitterUser fetchTwitterUser(User user)
FacebookUser fetchFacebookUser(User user)
List<Tweet> findTweets(TwitterUser twitterUser)
List<FacebookPost> findFacebookPosts(FacebookUser facebookUser)
Where
User getUser()
Give us the current user and fetch some information from the Database.
TwitterUser fetchTwitterUser(User user)
Fetch the twitter information associated with the user using an external API.
FacebookUser fetchFacebookUser(User user)
Fetch the facebook information associated with the user using an external API.
List<Tweet> findTweets(TwitterUser twitterUser)
Find all the tweets associated to the twitterUser.
List<FacebookPost> findFacebookPosts(FacebookUser twitterUser)
Find all the FacebookPost associated to the twitterUser.
And we like to print all the tweets and facebook’s post from the user (the use has not too much interaction in the social media so there aren’t too many information)
One approach it’s:
public static void main(String[] args) {
log.info("Starting example");
User user = getUser();
TwitterUser twitterUser = fetchTwitterUser(user); // fetching the twitter user
List<Tweet> tweets = findTweets(twitterUser); // fetching all the twitter for the user
FacebookUser facebookUser = fetchFacebookUser(user); // fetching the facebook user
List<FacebookPost> facebookPosts = findFacebookPosts(facebookUser); // fetching all the facebook posts
log.info("tweets: {}", tweets);
log.info("facebookPosts: {}", facebookPosts);
log.info("Finish example");
}
Basically we have

Now think about how the sequence it’s going to work.

In this case we assume that getUser, fetchFacebookUser, findFacebookPosts take 2 units in the other hand fetchTwtitterUser and findTweets take 3 units. The total time that the thread takes to do the action is 12 units (we are considering that the process of log the result is minimal).
Now imagine that each method takes 1 unit to process/calculate/prepare the information and the rest of the time it’s to get the resource (does not require any action from the thread).
We have:

So most of the time the thread is blocked (58%) so we are not doing a good use of the resource (CPU).
Now, we are going to do the same exercise using RxJava.
First we are going to change the code (we are assuming that we have the same method but asynchronous)
Observable<User> getUser()
Observable<TwitterUser> fetchTwitterUser(User user)
Observable<FacebookUser> fetchFacebookUser(User user)
Observable<List<Tweet>> findTweets(TwitterUser twitterUser)
Observable<List<FacebookPost>> findFacebookPosts(FacebookUser facebookUser)
The code:
public static void main(String[] args) {
log.info("Starting example");
getUser()
.flatMap(user -> {
Observable<List<Tweet>> tweetsObs = fetchTwitterUser(user).flatMap(twitterUser -> findTweets(twitterUser));
Observable<List<FacebookPost>> facebookObs = fetchFacebookUser(user).flatMap(facebookUser -> findFacebookPosts(facebookUser));
return Observable.zip(tweetsObs, facebookObs, (tweets, facebookPosts) -> new List[] {tweets, facebookPosts});
})
.subscribe(result -> {
List<Tweet> tweets = (List<Tweet>) result[0];
List<FacebookPost> facebookPosts = (List<FacebookPost>) result[1];
log.info("tweets: {}", tweets);
log.info("facebookPosts: {}", facebookPosts);
});
log.info("Finish example");
}
The marble diagram:

Yes it’s quite complicated to understand the diagram, but basically we start getting the user, and from that moment we create a parallel sequence of observable, where each sequence starts fetching the respective social media’s user and his related information. We combine both observable and we return an array with the result.
The sequence is:

Some comments before continuing…
As you see we can execute the fetchTwitterUser + findTweets in parallel with fetchFacebookUser + findFacebookPosts. The main thread it’s going to be less time blocked, in fact the main thread could be executing another instructions until the event comes back. In this case we are considering that the methods are executing in another thread, those threads are managed by the library and require some instructions that we avoided in the example to no complicate it (so the example isn’t 100% accurate).
One comment about the graphic seems to be that using RxJava generates faster applications than without it. However this is a mistake because could be another example (where we cannot parallelize) that without RxJava is faster (there is a kind of overhead creating the observable and observer) so assume that rxJava is going to be faster could be an error.
One thing that we can see is that with the library it is easier to manage asynchronous events without creating our own logic of synchronization between threads.
Next Post: Introduction to RxJava (Part III/III – Use case & How to test)