Rx's Magic Revealed

Introduction to RxJava

 

I guess everyone was afraid to use RxJava in production at first time, so do I. Its purpose and principle of operation have remained a mystery to me. Reading the source code did not add clarity and articles was just confusing. So, let me make a shot to answer the questions: "What are the challenges this technology solves better than alternatives?" And "How does it work?" By analogy with the classic Java and simple metaphors.

Emitter

We are all familiar with Iterator pattern.

interface Iterator<T> {

T next();

boolean hasNext();

}

Behind the scene, this interface hides any data source, moreover, it doesn’t matter what. Iterator completely hides all the details of implementation, allowing only two methods:

next - get the next element

hasNext - see if there is more data in the source

This pattern has one interesting feature: the user is requesting data and waits ("hangs on wire"), until the source will not retrieve the data. Therefore, the source usually is a final (often pre-formed) collection.

Let’s do some small refactoring.

interface Iterator<T> {

T getNext();

boolean isComplete();

}

I think, now you starts getting what is going on here. Emitter interface from RxJava (for consumers it’s duplicated in the Observer):

interface Emitter<T> {

void onNext(T value);

void onComplete();

void onError(Throwable error);

}

It is similar to an Iterator, but works in the opposite way - the source informs the consumer about the new data arrival.

This allows you to solve any problems with the threading on the source side. Also, if you are making the UI, you can be sure on the fact that all of the code responsible for GUI - consistent. Incredibly comfortable.

Sources

Now, let’s talk a little about the sources themselves. They come in multiple types:

  • Observable
  • Single
  • Maybe
  • ...

And they are similar to the onion, remember the Shreek’s speech: “... layer-by-layer”. After creating a single source, you can wrap it in another source, which can again turn in another source, and so on (until OutOfMemory, chizz).

Let wrap the answer to a source of question itself.

Observable.just(“some_secret_word”);

As we know, getting a response - a quite long operation. So we’ll wrap into source “that guy” who will perform the calculations in a special stream:

Observable.just(42)

.subscribeOn(computation());

Also, we want an application to show the answer. So let’s wrap into source our “next guy” who returns a response in the main thread:

Observable.just(42)

.subscribeOn(computation())

.observeOn(mainThread())

After we run current one and specify, that we want to output the answer into console.

Observable.just(42)

        .subscribeOn(computation())

        .observeOn(mainThread())

        .subscribe(answer -> System.out.print(answer));

So what happened?

Subscribe method is defined in Observable. It provides checking and preparation and then calls subscribeActual method, which is already defined in different ways for different sources.

In our case, subscribe method called subscribeActual in ObservableObserveOn method that calls a method subscribe wrapped in source, specifying where to put the flow of results.

In ObservableObserveOn we have one more helper - ObservableSubscribeOn. It’s subscribeActual launches subscribe wrapped in a given thread.

And, finally, in ObservableSubscribeOn wrapped into ObservableJust, which just expose its value in onNext method.

Of course, plain word of no interest. You can play with something more interesting, where your imagination is a limit.

RxJava contains a huge number of different implementations of sources. All of them works on the same principle, and the details, described in the documentation, is excellent. So I will not dwell on them.

Operations

All operations on the sources are divided into two types:

Non-terminal  - returns a new source, which is wrapped by original source

Terminal - performing requests chain and receive data (subscribe, map ...)

Remeber, nothing is executed until the terminal operation will be performed. The chain can be in memory without doing anything at all for a lo-o-ong time. That gives us an ability to not expose data until we received one.

Dispose

Execution of the chain can be interrupted. This is done by calling dispose() in DisposableObserver.

Thereafter RxJava stop execution of all chains of its Observers and calls interrupt() method from the threads that are no longer needed.

Conclusion

  • RxJava makes it easy to compose queries to the network, database, etc; organizing them to run simultaneously. This means that your users will get a faster and more responsive application.
  • contains no magic inside. You just need to specify and implement the sources chains.
  • Actually RxJava perfectly replaces the Streams API from Java 8 on older versions of Java. Since Android Java 8 is not supported with version 4.0, Rx will be the best solution.
Volodymyr Ishchak (Cube)
Volodymyr Ishchak (Cube)
Team Lead(Consultant)
How to Develop Leadership Skills and Taking Accountability

Developing Leadership Skills and Taking Accountability: How-to Guide

Shahed Islam
Experience of Organizing Hackathons at SJInnovation

Experience of Organizing Hackathons at SJInnovation

Sarvesh Shejwadkar
Top Digital Marketing Tools 2022

Must Have Tools for a Digital Agency

Shahed Islam