Introduction to RxJava (Part III/III – Use case & How to test)

In these posts we are going to explore the basic things about RxJava:

Some use cases with RxJava.

There are two typical scenarios when we start dealing with the observable, one if with dependency between the observables and the other is the opposite case. We are going to check both scenarios with an example.

Scenario: Dependency between observables

There are some cases where you need to fetch different information and one request depends on the result of the other invocation.

So we have the Observable A that emits Event A and in order to get the rest of the information we need to call another service using the information of Event A and after the call we are going to have and Observable B with the result.

Ex:
We like to get all the unread Email from a User and we have these two functions:

public Observable<User> getUser(String userId);

public Observable<Email> findUnreadEmails(String userEmail)

In this case we can use the flatMap operator:

public static void main(String[] args) {

   getUser("8899554")
           .flatMap(user -> findUnreadEmails(user.getEmail()))
           .subscribe(emailList -> {
              log.info("Emails size: ", emailList.size());
           });

}

Another example could be fetching an entity an update it, for this case we are combining the map and flatMap operator.

Ex.:
Given the functions:

public Observable<User> getUser(String userId);

public Observable<User> updateUser(User user);

We are going to update the user:

public static void main(String[] args) {
   String userId = "8899554";
   getUser(userId)
           .map(user -> {
               user.setStatus("ACTIVE");
               return user;
           })
           .flatMap(user -> updateUser(user))
           .subscribe(user -> {
               log.info("User {} updated", user.getName());
           });
}

Scenario: Independency between observables

In this case we have two services that are independent between them but we require the result of the two invoke to have a final entity.

Ex.:
Given the accountId we like to have the Account entity that is composed by the owner entity, list of transaction and a list of secondary holder.

public Observable<User> getAccountOwner(String accountId);
public Observable<List<Transaction>> findLastTransactions(String accountId);
public Observable<List<User>> findSecondaryHolder(String accountId);

In this case we can use the zip operator:

public static void main(String[] args) {

   String accountId = "123456789";

   Observable<User> accountOwnerObs = getAccountOwner(accountId);
   Observable<List<Transaction>> transactionsObs = findLastTransactions(accountId);
   Observable<List<User>> secondaryHoldersObs = findSecondaryHolder(accountId);

   Observable.zip(accountOwnerObs, transactionsObs, secondaryHoldersObs, (accountOwner, transactions, secondaryHolders) -> {
     Account account = new Account();
     account.setOwner(accountOwner);
     account.setTransactions(transactions);
     account.setSecondaryHolders(secondaryHolders);
     return account;
   }).subscribe(account -> {
      log.info("Account owner name: {}", account.getOwner().getName());
      log.info("Transactions: {}", account.getTransactions().size());
      log.info("Secondary holders: {}", account.getSecondaryHolders().size());
   });
}

If Account have the constructor with all the parameters we can use the advantage of lambda and the code it’s going to be:

public static void main(String[] args) {

   String accountId = "123456789";

   Observable<User> accountOwnerObs = getAccountOwner(accountId);
   Observable<List<Transaction>> transactionsObs = findLastTransactions(accountId);
   Observable<List<User>> secondaryHoldersObs = findSecondaryHolder(accountId);

   Observable.zip(accountOwnerObs, transactionsObs, secondaryHoldersObs, (accountOwner, transactions, secondaryHolders) -> Account::new).subscribe(account -> {
      log.info("Account owner name {}", account.getOwner().getName());
      log.info("Transactions: {}", account.getTransactions().size());
      log.info("Secondary holders: {}", account.getSecondaryHolders().size());
   });
}

Another example could be if we have different source of information like:

public Observable<List<Entity>> findEntityFromNetworkByDescription(String description);
public Observable<List<Entity>> findEntityFromDiskByDescription(String description);
public Observable<List<Entity>> findEntityFromDataBaseByDescription(String description);
public static void main(String[] args) {
   String description = "Some description";
   Observable<List<Entity>> entitiesFromNetworkObs = findEntityFromNetworkByDescription(description);
   Observable<List<Entity>> entitiesFromDiskObs = findEntityFromDiskByDescription(description);
   Observable<List<Entity>> entitiesFromDataBaseObs = findEntityFromDataBaseByDescription(description);

   Observable.zip(entitiesFromNetworkObs, entitiesFromDiskObs, entitiesFromDataBaseObs,
           (entitiesFromNetwork, entitiesFromDisk, entitiesFromDataBase) -> {
               List<Entity> entities = new ArrayList();
               entities.addAll(entitiesFromNetwork);
               entities.addAll(entitiesFromDisk);
               entities.addAll(entitiesFromDataBase);
               return entities;
           }).subscribe(entities -> {
              log.info("Number of entities {}", entities.size());
           });
}

For more case, the official documentation has a nice tree of decision http://reactivex.io/documentation/operators.html#tree

How to test

The RxJava library came with a function that helps us to perform the unit test. This function is test() and provides the TestObserver object that has a register of the events.

For the following examples we need to add the test dependency to our project:

<dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>4.12</version>
   <scope>test</scope>
</dependency>
<dependency>
   <groupId>org.mockito</groupId>
   <artifactId>mockito-core</artifactId>
   <version>2.22.0</version>
   <scope>test</scope>
</dependency>

Let’s look at an example:

Imagine we like to do a test for develop this method in UserService class:

public Observable<User> getUserByEmail(String email)

So we do the following test:

public class BlogTest {

   @Test
   public void getUserByEmail_withValidEmail_returnObservableOfUser() {
       // setUp
       String email = "test@test.com";
       UserService instance = new UserService();

       //execution
       TestObserver<User> testObserver = instance.getUserByEmail(email).test();

       //assertion over observable event
       testObserver.assertComplete();
       testObserver.assertNoErrors();
       testObserver.assertValueCount(1);
       testObserver.dispose();

       //assertion over the entity
       User result = testObserver.values().get(0);
       assertNotNull(result);
       assertEquals("Test", result.getName());
   }
}
}

In the execution line we calling the method test() that going to give us the TestObserver entity:

TestObserver<User> testObserver = instance.getUserByEmail(email).test();

Later in the assertion block we have:

testObserver.assertComplete();
testObserver.assertNoErrors();
testObserver.assertValueCount(1);
testObserver.dispose();

With the lines above we are asserting that the Observable was completed without error and it produces only one Event. 

Later we are looking at the event itself (result) just to assert if we are fetching the expected entity. 

Now, what happens our service has another dependency that we need to mock and that dependency call return and observable?

Imagine that for get the user we need to first get the EmailAccount by email and later fetch the user by UserId using the userId inside of the EmailAccount;

public class EmailAccountRepository {

   public Observable<EmailAccount> getEmailAccount(String email) {}
}


public class UserRepository {
   public Observable<User> getUser(String userId) {}
}

This is a more complex example of the test:

@Test
public void getUserByEmail_withValidEmail_returnObservableOfUser() {
   // setUp
   String email = "test@test.com";
   EmailAccountRepository emailAccountRepositoryMock = Mockito.mock(EmailAccountRepository.class);
   EmailAccount emailAccount = new EmailAccount();
   emailAccount.setUserId("123456");
   when(emailAccountRepositoryMock.getEmailAccount(anyString())).thenReturn(Observable.just(emailAccount));

   UserRepository userRepositoryMock = Mockito.mock(UserRepository.class);
   User user = new User();
   user.setName("Test");
   when(userRepositoryMock.getUser(anyString())).thenReturn(Observable.just(user));
   UserService instance = new UserService(userRepositoryMock, emailAccountRepositoryMock);

   //execution
   TestObserver<User> testObserver = instance.getUserByEmail(email).test();

   //assertion over observable event
   testObserver.assertComplete();
   testObserver.assertNoErrors();
   testObserver.assertValueCount(1);
   testObserver.dispose();

   //verify
   verify(emailAccountRepositoryMock).getEmailAccount(eq(email));
   verify(userRepositoryMock).getUser(emailAccount.getUserId());

   //assertion over the entity
   User result = testObserver.values().get(0);
   assertNotNull(result);
   assertEquals(user.getName(), result.getName());
}

And this is the implementation:

public class UserService {
   private UserRepository userRepository;
   private EmailAccountRepository emailAccountRepository;

   public UserService(final UserRepository userRepository, final EmailAccountRepository emailAccountRepository) {
       this.userRepository = userRepository;
       this.emailAccountRepository = emailAccountRepository;
   }

   public Observable<User> getUserByEmail(final String email) {
       return this.emailAccountRepository.getEmailAccount(email)
                   .flatMap(emailAccount -> this.userRepository.getUser(emailAccount.getUserId()));
   }
}

The last test that we need if just in case we like to test an error:

Imagine now we like to validate that the email is valid otherwise we like to get an error so we can create a new test that looks:

@Test
public void getUserByEmail_withInValidEmail_returnObservableOfIllegalArgumentException() {
   // setUp
   EmailAccountRepository emailAccountRepositoryMock = Mockito.mock(EmailAccountRepository.class);
   UserRepository userRepositoryMock = Mockito.mock(UserRepository.class);
   UserService instance = new UserService(userRepositoryMock, emailAccountRepositoryMock);
   String wrongEmail = "test";

   //execution
   TestObserver<User> testObserver = instance.getUserByEmail(wrongEmail).test();

   //assert
   testObserver.dispose();
   testObserver.assertFailure(IllegalArgumentException.class);
   Throwable error = testObserver.errors().get(0);
   assertEquals("Illegal email", error.getMessage());
}

As you can see we changed the assertion and now we are expecting an error.

The implemented code should be:

public Observable<User> getUserByEmail(final String email) {
   if (!isValid(email)) {
       return Observable.error(new IllegalArgumentException("Illegal email"));
   }
   return this.emailAccountRepository.getEmailAccount(email)
               .flatMap(emailAccount -> this.userRepository.getUser(emailAccount.getUserId()));
}

Finally

In this post we cover the basic think about RxJava with you can start to use it.

If you remember the definition that we put at the beginning we mention that RxJava “is a library for composing asynchronous and event-based programs“ but we barely mention the asynchronous feature of the library in the post, that is because that topic require their own post. 

These are the list for the different topic that you could research after you understand the basic thing about the library:

  • Manage the threads: susbscribeOn/observerOn and different thread schedulers.
  • Backpressure.
  • Cold Observable/ Hot Observable.
  • Different types of Observables (Single/Completable/Maybe/Flowable).
  • Different types of Subjects (AsyncSubject/BehaviorSubject/PublishSubject/ReplaySubject).
  • Error handling and recovery.
  • Operators

Some links

2 Replies to “Introduction to RxJava (Part III/III – Use case & How to test)”

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.